-
Notifications
You must be signed in to change notification settings - Fork 3
/
ch-frb-l1.cpp
1579 lines (1282 loc) · 64.4 KB
/
ch-frb-l1.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Major features missing:
//
// - Distributed logging is not integrated
// - If anything goes wrong, the L1 server will crash!
//
// The L1 server can run in two modes: either a "production-scale" mode with 20 cores and either 8 or 16 beams,
// or a "subscale" mode with (nbeams <= 4) and no core-pinning.
//
// The "production-scale" mode is hardcoded to assume the NUMA setup of the CHIMEFRB L1 nodes:
// - Dual CPU
// - 10 cores/cpu
// - Hyperthreading enabled
// - all NIC's on the same PCI-E bus as the first CPU
// - all SSD's on the first CPU.
//
// Note that the Linux scheduler defines 40 "cores":
// cores 0-9: primary hyperthread on CPU1
// cores 10-19: primary hyperthread on CPU2
// cores 20-29: secondary hyperthread on CPU1
// cores 30-39: secondary hyperthread on CPU2
#include <thread>
#include <fstream>
#include <iomanip>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <ifaddrs.h>
#include <curl/curl.h>
#include <ch_frb_io_internals.hpp>
#include <rf_pipelines.hpp>
#include <bonsai.hpp>
#include <l1-rpc.hpp>
#include <l1-prometheus.hpp>
#include "ch_frb_l1.hpp"
#include "chlog.hpp"
// "cxxopts.hpp" requires G++ >= 4.9
#if __GNUC__ > 4 || \
(__GNUC__ == 4 && __GNUC_MINOR__ >= 9)
#define HAVE_CXXOPTS 1
#else
#define HAVE_CXXOPTS 0
#endif
#if HAVE_CXXOPTS
#include "cxxopts.hpp"
#endif
using namespace std;
using namespace ch_frb_l1;
// -------------------------------------------------------------------------------------------------
//
// l1_config: reads and parses config files, does a lot of sanity checking,
// but does not allocate any "heavyweight" data structures.
typedef std::unique_lock<std::mutex> ulock;
struct l1_config
{
l1_config() { }
l1_config(int argc, char **argv);
// Command-line arguments
string l1_config_filename;
string rfi_config_filename;
string bonsai_config_filename;
string l1b_config_filename;
Json::Value rfi_transform_chain_json;
bonsai::config_params bonsai_config;
// Command-line flags
// Currently, l1_verbosity can have the following values (I may add more later):
// 1: pretty quiet
// 2: pretty noisy
bool tflag = false;
bool fflag = false;
bool l1b_pipe_io_debug = false;
bool memory_pool_debug = false;
bool write_chunk_debug = false;
bool deliberately_crash = false;
bool ignore_end_of_stream = false;
int l1_verbosity = 1;
// nstreams is automatically determined by the number of (ipaddr, port) pairs.
// There will be one (network_thread, assembler_thread, rpc_server) triple for each stream.
int nbeams = 0;
int nfreq = 0;
int nstreams = 0;
int nt_per_packet = 0;
int fpga_counts_per_sample = 384;
int nt_align = 0; // used to align stream of assembled_chunks to RFI/dedispersion block size.
// The number of frequencies in the downsampled RFI chain.
// Must match the number of downsampled frequencies in the RFI chain JSON file. (probably 1024)
int nrfifreq = 0;
// If slow_kernels=false (the default), the L1 server will use fast assembly language kernels
// for its packet processing. If slow_kernels=true, then it will use reference kernels which
// are much slower.
//
// Note 1: the slow kernels are too slow for non-subscale use! If slow kernels are used on
// the "production-scale" L1 server with (nbeams, nupfreq) = (16, 16), it may crash.
//
// Note 2: the fast kernels can only be used if certain conditions are met. As of this writing,
// the conditions are: (a) nupfreq must be even, and (b) nt_per_packet must be 16. In particular,
// for a subscale run with nupfreq=1, the fast kernels can't be used.
//
// Conditions (a) and (b) could be generalized by writing a little more code, if this would be useful
// then let me know!
bool slow_kernels = false;
// Both vectors have length nstreams.
vector<string> ipaddr;
vector<int> port;
// One L1-RPC per stream
vector<string> rpc_address;
// One L1-prometheus per stream
vector<string> prometheus_address;
// Optional chlog logging server address
string logger_address;
// Optional URL to get frame0-ctime
string frame0_url;
// Timeout (in ms) for retrieving frame0-ctime
int frame0_timeout;
// Size of RFI mask measurement ringbuffer (15 seconds required for prometheus; more could be useful for other monitoring tools)
int rfi_mask_meas_history;
// A vector of length nbeams, containing the beam_ids that will be processed on this L1 server.
// It is currently assumed that these are known in advance and never change!
// If unspecified, 'beam_ids' defaults to { 0, ..., nbeams-1 }.
vector<int> beam_ids;
// Buffer sizes, as specified in config file.
int unassembled_ringbuf_nsamples = 4096;
int assembled_ringbuf_nsamples = 8192;
vector<int> telescoping_ringbuf_nsamples;
double write_staging_area_gb = 0.0;
// "Derived" unassembled ringbuf parameters.
int unassembled_ringbuf_capacity = 0;
int unassembled_nbytes_per_list = 0;
int unassembled_npackets_per_list = 0;
// "Derived" assembled and telescoping ringbuf parameters.
int assembled_ringbuf_nchunks = 0;
vector<int> telescoping_ringbuf_nchunks;
// Parameters of the memory_slab_pool(s)
int nbytes_per_memory_slab = 0; // size (in bytes) of memory slab used to store assembled_chunk
int total_memory_slabs = 0; // total for node
// List of output devices (e.g. '/ssd', '/nfs')
vector<string> output_device_names;
// If 'intensity_prescale' is specified, then all intensity values will be multiplied by its value.
// This is a workaround for 16-bit overflow issues in bonsai. When data is saved to disk, the
// prescaling will not be applied.
float intensity_prescale = 1.0;
// L1b linkage. Note: assumed L1b command line is:
// <l1b_executable_filename> <l1b_config_filename> <beam_id>
string l1b_executable_filename;
bool l1b_search_path = false; // will $PATH be searched for executable?
int l1b_buffer_nsamples = 0; // determines buffer size between L1a and L1b (0 = system default)
double l1b_pipe_timeout = 0.0; // timeout in seconds between L1a and L1
bool l1b_pipe_blocking = false; // setting this to true is equivalent to a very large l1b_pipe_timeout
// "Derived" parameter: L1b pipe capacity (derived from l1b_buffer_nsamples)
int l1b_pipe_capacity = 0;
// Forces RFI removal and dedispersion to run in separate threads (shouldn't
// need to specify this explicitly, except for debugging).
bool force_asynchronous_dedispersion = false;
// Occasionally useful for debugging: If track_global_trigger_max is true, then when
// the L1 server exits, it will print the (DM, arrival time) of the most significant FRB.
bool track_global_trigger_max = false;
// Also intended for debugging. If the optional parameter 'stream_acqname' is
// specified, then the L1 server will auto-stream all chunks to disk. Warning:
// it's very easy to use a lot of disk space this way!
string stream_devname; // specified in config file, options are "ssd" or "nfs", defaults to "ssd"
string stream_acqname; // specified in config file, defaults to "", which results in no streaming acquisition
vector<int> stream_beam_ids; // specified in config file, defaults to all beam ID's on node
string stream_filename_pattern; // derived from 'stream_devname' and 'stream_acqname'
void _have_warnings() const;
};
#if HAVE_CXXOPTS
#else
static void usage()
{
cerr << "Usage: ch-frb-l1 [-fvipmwct] <l1_config.yaml> <rfi_config.json> <bonsai_config.hdf5> <l1b_config_file>\n"
<< " -f forces the L1 server to run, even if the config files look fishy\n"
<< " -v increases verbosity of the toplevel ch-frb-l1 logic\n"
<< " -i ignores end-of-stream packets\n"
<< " -p enables a very verbose debug trace of the pipe I/O between L1a and L1b\n"
<< " -m enables a very verbose debug trace of the memory_slab_pool allocation\n"
<< " -w enables a very verbose debug trace of the logic for writing chunks\n"
<< " -c deliberately crash dedispersion thread (for debugging, obviously)\n"
<< " -t starts a \"toy server\" which assembles packets, but does not run RFI removal,\n"
<< " dedispersion, or L1B (if -t is specified, then the last 3 arguments are optional)\n";
exit(2);
}
#endif
// FIXME: split this monster constructor into multiple functions for readability?
l1_config::l1_config(int argc, char **argv)
{
const int nfreq_c = ch_frb_io::constants::nfreq_coarse_tot;
vector<string> args;
vector<int> acq_beams;
string acq_name;
#if HAVE_CXXOPTS
cxxopts::Options parser("ch-frb-l1", "CHIME FRB L1 server");
parser.positional_help("<l1_config.yaml> [<rfi_config.json> <bonsai_config.hdf5> <l1b_config_file>]");
parser.add_options()
("h,help", "Help")
("v,verbose", "Increases verbosity of the toplevel ch-frb-l1 logic")
("f,force", "Forces the L1 server to run, even if the config files look fishy")
("p,pipe", "Enables a very verbose debug trace of the pipe I/O between L1a and L1b")
("i,ignore", "Ignores end-of-stream packets")
("m,memory", "Enables a very verbose debug trace of the memory_slab_pool allocation")
("w,write", "Eables a very verbose debug trace of the logic for writing chunks")
("c,crash", "Deliberately crash dedispersion thread (for debugging, obviously)")
("t,toy", "Starts a \"toy server\" which assembles packets, but does not run RFI removal, dedispersion, or L1B (if -t is specified, then the last 3 arguments are optional)")
("a,acq", "Stream data to disk, saving it to this acquisition directory name", cxxopts::value<std::string>(acq_name))
("n,nfs", "For streaming data acquisition, stream to NFS, not SSD")
("b,beam", "For streaming data acquisition, beam number to capture (can be repeated; default is all beams)", cxxopts::value<vector<int> >(acq_beams), "<beam number>")
("positional", "Positional parameters", cxxopts::value<std::vector<std::string>>(args))
;
parser.parse_positional({"positional"});
auto opts = parser.parse(argc, argv);
if (opts.count("v"))
this->l1_verbosity = 2;
if (opts.count("f"))
this->fflag = true;
if (opts.count("i"))
this->ignore_end_of_stream = true;
if (opts.count("p"))
this->l1b_pipe_io_debug = true;
if (opts.count("m"))
this->memory_pool_debug = true;
if (opts.count("w"))
this->write_chunk_debug = true;
if (opts.count("c"))
this->deliberately_crash = true;
if (opts.count("t"))
this->tflag = true;
if (opts.count("help") || (args.size() == 0) || (!((args.size() == 4) || ((args.size() == 1) && (this->tflag)))) ){
std::cout << parser.help({""}) << endl;
exit(0);
}
this->l1_config_filename = args[0];
if (args.size() == 4) {
this->rfi_config_filename = args[1];
this->bonsai_config_filename = args[2];
this->l1b_config_filename = args[3];
}
#else
// Low-budget command line parsing
for (int i = 1; i < argc; i++) {
if (argv[i][0] != '-') {
args.push_back(argv[i]);
continue;
}
for (int j = 1; argv[i][j] != 0; j++) {
if (argv[i][j] == 'v')
this->l1_verbosity = 2;
else if (argv[i][j] == 'f')
this->fflag = true;
else if (argv[i][j] == 'p')
this->l1b_pipe_io_debug = true;
else if (argv[i][j] == 'i')
this->ignore_end_of_stream = true;
else if (argv[i][j] == 'm')
this->memory_pool_debug = true;
else if (argv[i][j] == 'w')
this->write_chunk_debug = true;
else if (argv[i][j] == 'c')
this->deliberately_crash = true;
else if (argv[i][j] == 't')
this->tflag = true;
else
usage();
}
}
if (args.size() == 4) {
this->l1_config_filename = args[0];
this->rfi_config_filename = args[1];
this->bonsai_config_filename = args[2];
this->l1b_config_filename = args[3];
}
else if (tflag && (args.size() == 1))
this->l1_config_filename = args[0];
else
usage();
#endif
if (!tflag) {
// Open rfi_config file.
std::ifstream rfi_config_file(rfi_config_filename);
if (rfi_config_file.fail())
throw runtime_error("ch-frb-l1: couldn't open file " + rfi_config_filename);
// Parse rfi_config file and initialize 'rfi_transform_chain_json'.
Json::Reader rfi_config_reader;
if (!rfi_config_reader.parse(rfi_config_file, this->rfi_transform_chain_json))
throw runtime_error("ch-frb-l1: couldn't parse json file " + rfi_config_filename);
// Throwaway call, to get an early check that rfi_config_file is valid.
// FIXME bind() here?
auto rfi_chain = rf_pipelines::pipeline_object::from_json(rfi_transform_chain_json);
#if 0
// FIXME pretty-print rfi_chain
if (l1_verbosity >= 2) {
cout << rfi_config_filename << ": " << rfi_chain.size() << " transforms\n";
for (unsigned int i = 0; i < rfi_chain.size(); i++)
cout << rfi_config_filename << ": transform " << i << "/" << rfi_chain.size() << ": " << rfi_chain[i]->name << "\n";
}
#endif
// Parse bonsai_config file and initialize 'bonsai_config'.
this->bonsai_config = bonsai::config_params(bonsai_config_filename);
if (l1_verbosity >= 2) {
bool write_derived_params = true;
string prefix = bonsai_config_filename + ": ";
bonsai_config.write(cout, write_derived_params, prefix);
}
// Check that the bonsai config file contains all transfer matrices.
// This will be the case if it is the output of 'bonsai-mkweight'.
bool have_transfer_matrices = true;
for (int itree = 0; itree < bonsai_config.ntrees; itree++)
if (!bonsai_config.transfer_matrices[itree])
have_transfer_matrices = false;
if (!have_transfer_matrices) {
throw runtime_error(bonsai_config_filename + ": transfer matrices not found. Maybe you accidentally specified a .txt file"
+ " instead of .hdf5? See ch_frb_l1/MANUAL.md for more info");
}
}
// Remaining code in this function reads l1_config yaml file.
int yaml_verbosity = (this->l1_verbosity >= 2) ? 1 : 0;
yaml_paramfile p(l1_config_filename, yaml_verbosity);
// These parameters can be read right away.
this->nbeams = p.read_scalar<int> ("nbeams");
this->nfreq = p.read_scalar<int> ("nfreq");
this->nt_per_packet = p.read_scalar<int> ("nt_per_packet");
this->fpga_counts_per_sample = p.read_scalar<int> ("fpga_counts_per_sample", 384);
this->nt_align = p.read_scalar<int> ("nt_align");
this->nrfifreq = p.read_scalar<int> ("nrfifreq");
this->ipaddr = p.read_vector<string> ("ipaddr");
this->port = p.read_vector<int> ("port");
this->rpc_address = p.read_vector<string> ("rpc_address");
this->prometheus_address = p.read_vector<string> ("prometheus_address");
this->logger_address = p.read_scalar<string> ("logger_address", "");
this->frame0_url = p.read_scalar<string> ("frame0_url");
this->frame0_timeout = p.read_scalar<int> ("frame0_timeout_ms", 3000);
this->rfi_mask_meas_history = p.read_scalar<int>("rfi_mask_meas_history", 300);
this->slow_kernels = p.read_scalar<bool> ("slow_kernels", false);
this->unassembled_ringbuf_nsamples = p.read_scalar<int> ("unassembled_ringbuf_nsamples", 4096);
this->assembled_ringbuf_nsamples = p.read_scalar<int> ("assembled_ringbuf_nsamples", 8192);
this->telescoping_ringbuf_nsamples = p.read_vector<int> ("telescoping_ringbuf_nsamples", {});
this->write_staging_area_gb = p.read_scalar<double> ("write_staging_area_gb", 0.0);
this->output_device_names = p.read_vector<string> ("output_devices");
this->intensity_prescale = p.read_scalar<float> ("intensity_prescale", 1.0);
this->l1b_executable_filename = tflag ? p.read_scalar<string> ("l1b_executable_filename","") : p.read_scalar<string> ("l1b_executable_filename");
this->l1b_search_path = p.read_scalar<bool> ("l1b_search_path", false);
this->l1b_buffer_nsamples = p.read_scalar<int> ("l1b_buffer_nsamples", 0);
this->l1b_pipe_timeout = p.read_scalar<double> ("l1b_pipe_timeout", 0.0);
this->l1b_pipe_blocking = p.read_scalar<bool> ("l1b_pipe_blocking", false);
this->force_asynchronous_dedispersion = p.read_scalar<bool> ("force_asynchronous_dedispersion", false);
this->track_global_trigger_max = p.read_scalar<bool> ("track_global_trigger_max", false);
// Create the map from network interface names ("eno2") to IP address.
unordered_map<string, string> interfaces;
// Get list of network interfaces...
{
struct ifaddrs *ifaces, *iface;
if (getifaddrs(&ifaces)) {
throw runtime_error("Failed to get network interfaces -- getifaddrs(): " + string(strerror(errno)));
}
for (iface = ifaces; iface; iface = iface->ifa_next) {
//chlog("Network interface: " << iface->ifa_name);
//if (string(iface->ifa_name) != ipaddr[i]) {
//continue;
//}
struct sockaddr* address = iface->ifa_addr;
if (address->sa_family != AF_INET) {
//chlog("not INET");
continue;
}
struct sockaddr_in* inaddress = reinterpret_cast<struct sockaddr_in*>(address);
struct in_addr addr = inaddress->sin_addr;
char* addrstring = inet_ntoa(addr);
chlog("Network interface: " << iface->ifa_name << " has IP " << addrstring);
//chlog("Found match with IP address: " << addrstring);
interfaces[string(iface->ifa_name)] = string(addrstring);
//ipaddr[i] = string(addrstring);
//break;
}
freeifaddrs(ifaces);
}
// Convert network interface names in "ipaddr", eg, "eno2", into the interface's IP address.
for (size_t i=0; i<ipaddr.size(); i++) {
// Try to parse as dotted-decimal IP address
struct in_addr inaddr;
if (inet_aton(ipaddr[i].c_str(), &inaddr) == 1) {
// Correctly parsed as dotted-decimal IP address.
continue;
}
// If doesn't parse as dotted-decimal, lookup in interfaces mapping.
auto val = interfaces.find(ipaddr[i]);
if (val == interfaces.end()) {
throw runtime_error("Config file ipaddr entry \"" + ipaddr[i] + "\" was not dotted IP address and was not one of the known network interfaces");
}
chlog("Mapped IP addr " << ipaddr[i] << " to " << val->second);
ipaddr[i] = val->second;
}
// Convert network interface names in "rpc_address" entries.
for (size_t i=0; i<rpc_address.size(); i++) {
// "tcp://eno2:5555" -> "tcp://10.7.100.15:5555"
size_t proto = rpc_address[i].find("//");
if (proto == std::string::npos)
continue;
size_t port = rpc_address[i].rfind(":");
if (port == std::string::npos)
continue;
string host = rpc_address[i].substr(proto + 2, port - (proto+2));
//chlog("RPC address host: \"" << host << "\"");
auto val = interfaces.find(host);
if (val != interfaces.end()) {
string new_addr = rpc_address[i].substr(0, proto+2) + val->second + rpc_address[i].substr(port);
chlog("Mapping RPC address " << rpc_address[i] << " to " << new_addr);
rpc_address[i] = new_addr;
}
}
// Convert network interface names in "prometheus_address" entries.
for (size_t i=0; i<prometheus_address.size(); i++) {
// "eno2:8888" -> "10.7.100.15:8888"
// "8888" -> "8888"
size_t port = prometheus_address[i].find(":");
if (port == std::string::npos)
continue;
string host = prometheus_address[i].substr(0, port);
chlog("Prometheus address host: \"" << host << "\"");
auto val = interfaces.find(host);
if (val != interfaces.end()) {
string new_addr = val->second + prometheus_address[i].substr(port);
chlog("Mapping Prometheus address " << prometheus_address[i] << " to " << new_addr);
prometheus_address[i] = new_addr;
}
}
// Lots of sanity checks.
// First check that we have a consistent 'nstreams'.
if ((ipaddr.size() == 1) && (port.size() > 1))
this->ipaddr = vector<string> (port.size(), ipaddr[0]);
else if ((ipaddr.size() > 1) && (port.size() == 1))
this->port = vector<int> (ipaddr.size(), port[0]);
if (ipaddr.size() != port.size())
throw runtime_error(l1_config_filename + ": expected 'ip_addr' and 'port' to be lists of equal length");
this->nstreams = ipaddr.size();
// More sanity checks..
if (nbeams <= 0)
throw runtime_error(l1_config_filename + ": 'nbeams' must be >= 1");
if (nrfifreq < 0)
throw runtime_error(l1_config_filename + ": 'nrfifreq' must be positive (or zero), and must match the number of downsampled frequencies in the RFI chain JSON file -- probably 1024.");
if (!tflag && (nfreq != bonsai_config.nfreq))
throw runtime_error("ch-frb-l1: 'nfreq' values in l1 config file and bonsai config file must match");
if (nfreq <= 0)
throw runtime_error(l1_config_filename + ": 'nfreq' must be >= 1");
if (nfreq % nfreq_c)
throw runtime_error(l1_config_filename + ": 'nfreq' must be a multiple of " + to_string(nfreq_c));
if (nfreq > 16384)
throw runtime_error(l1_config_filename + ": nfreq > 16384 is currently not allowed");
if (nstreams <= 0)
throw runtime_error(l1_config_filename + ": 'ip_addr' and 'port' must have length >= 1");
if (nt_per_packet <= 0)
throw runtime_error(l1_config_filename + ": 'nt_per_packet' must be >= 1");
if (fpga_counts_per_sample <= 0)
throw runtime_error(l1_config_filename + ": 'fpga_counts_per_sample' must be >= 1");
if ((nt_align < 0) || (nt_align % ch_frb_io::constants::nt_per_assembled_chunk))
throw runtime_error(l1_config_filename + ": 'nt_align' must be a multiple of " + to_string(ch_frb_io::constants::nt_per_assembled_chunk));
if (rpc_address.size() != (unsigned int)nstreams)
throw runtime_error(l1_config_filename + ": 'rpc_address' must be a list whose length is the number of (ip_addr,port) pairs");
if (prometheus_address.size() != (unsigned int)nstreams)
throw runtime_error(l1_config_filename + ": 'prometheus_address' must be a list whose length is the number of (ip_addr,port) pairs");
if (!slow_kernels && (nt_per_packet != 16))
throw runtime_error(l1_config_filename + ": fast kernels (slow_kernels=false) currently require nt_per_packet=16");
if (!slow_kernels && (nfreq % (2*nfreq_c)))
throw runtime_error(l1_config_filename + ": fast kernels (slow_kernels=false) currently require nfreq divisible by " + to_string(2*nfreq_c));
if (l1b_buffer_nsamples < 0)
throw runtime_error(l1_config_filename + ": l1b_buffer_nsamples must be >= 0");
if (l1b_pipe_timeout < 0.0)
throw runtime_error(l1_config_filename + ": l1b_pipe_timeout must be >= 0.0");
if (unassembled_ringbuf_nsamples <= 0)
throw runtime_error(l1_config_filename + ": 'unassembled_ringbuf_nsamples' must be >= 1");
if (assembled_ringbuf_nsamples <= 0)
throw runtime_error(l1_config_filename + ": 'assembled_ringbuf_nsamples' must be >= 1");
if (telescoping_ringbuf_nsamples.size() > 4)
throw runtime_error(l1_config_filename + ": 'telescoping_ringbuf_nsamples' must be a list of length <= 4");
if ((telescoping_ringbuf_nsamples.size() > 0) && (telescoping_ringbuf_nsamples[0] < assembled_ringbuf_nsamples))
throw runtime_error(l1_config_filename + ": if specified, 'telescoping_ringbuf_nsamples[0]' must be >= assembled_ringbuf_nsamples");
if (write_staging_area_gb < 0.0)
throw runtime_error(l1_config_filename + ": 'write_staging_area_gb' must be >= 0.0");
for (unsigned int i = 0; i < telescoping_ringbuf_nsamples.size(); i++) {
if (telescoping_ringbuf_nsamples[i] <= 0)
throw runtime_error(l1_config_filename + ": all elements of 'telescoping_ringbuf_nsamples' must be > 0");
}
if (nbeams % nstreams != 0) {
throw runtime_error(l1_config_filename + ": nbeams (=" + to_string(nbeams) + ") must be a multiple of nstreams (="
+ to_string(nstreams) + ", inferred from number of (ipaddr,port) pairs");
}
// Read beam_ids (postponed to here, so we get the check on 'nbeams' first).
this->beam_ids = p.read_vector<int> ("beam_ids", vrange(0,nbeams));
if (beam_ids.size() != (unsigned)nbeams)
throw runtime_error(l1_config_filename + ": 'beam_ids' must have length 'nbeams'");
// Read stream params (postponed to here, so we get 'beam_ids' first).
// If a stream is specified on the command-line, override the config file.
#if HAVE_CXXOPTS
if (opts.count("acq")) {
if (acq_name == "none") {
// no streaming!
} else {
this->stream_devname = opts.count("nfs") ? "nfs" : "ssd";
this->stream_acqname = acq_name;
this->stream_beam_ids = acq_beams;
}
} else {
this->stream_devname = p.read_scalar<string> ("stream_devname", "ssd");
this->stream_acqname = p.read_scalar<string> ("stream_acqname", "");
this->stream_beam_ids = p.read_vector<int> ("stream_beam_ids", this->beam_ids);
}
#else
this->stream_devname = p.read_scalar<string> ("stream_devname", "ssd");
this->stream_acqname = p.read_scalar<string> ("stream_acqname", "");
this->stream_beam_ids = p.read_vector<int> ("stream_beam_ids", this->beam_ids);
#endif
for (int b: stream_beam_ids)
if (!vcontains(beam_ids, b))
throw runtime_error(l1_config_filename + ": 'stream_beam_ids' must be a subset of 'beam_ids' (which defaults to [0,...,nbeams-1] if unspecified)");
// "Derived" unassembled ringbuf params.
int fp = nt_per_packet * fpga_counts_per_sample; // FPGA counts per packet
int np = int(40000/fp) + 1; // number of packets in 100 ms, rounded up. This will correspond to one unassembled packet list.
int nb = ch_frb_io::intensity_packet::packet_size(nbeams/nstreams, 1, nfreq/nfreq_c, nt_per_packet);
this->unassembled_ringbuf_capacity = int(unassembled_ringbuf_nsamples / (np * nt_per_packet)) + 1;
this->unassembled_nbytes_per_list = np * nfreq_c * nb;
this->unassembled_npackets_per_list = np * nfreq_c;
if (l1_verbosity >= 2) {
cout << l1_config_filename << ": setting unassembled_ringbuf_capacity=" << unassembled_ringbuf_capacity << endl
<< l1_config_filename << ": setting unassembled_nbytes_per_list=" << unassembled_nbytes_per_list << endl
<< l1_config_filename << ": setting unassembled_npackets_per_list=" << unassembled_npackets_per_list << endl;
}
// "Derived" assembled and telescoping ringbuf params.
int nt_c = ch_frb_io::constants::nt_per_assembled_chunk;
this->assembled_ringbuf_nchunks = (assembled_ringbuf_nsamples + nt_c - 1) / nt_c;
this->assembled_ringbuf_nchunks = max(assembled_ringbuf_nchunks, 2);
if ((l1_verbosity >= 2) && (assembled_ringbuf_nsamples != assembled_ringbuf_nchunks * nt_c)) {
cout << l1_config_filename << ": assembled_ringbuf_nsamples increased from "
<< assembled_ringbuf_nsamples << " to " << (assembled_ringbuf_nchunks * nt_c)
<< " (rounding up to multiple of ch_frb_io::nt_per_assembled_chunk)" << endl;
}
int nr = telescoping_ringbuf_nsamples.size();
this->telescoping_ringbuf_nchunks.resize(nr);
for (int i = 0; i < nr; i++) {
nt_c = (1 << i) * ch_frb_io::constants::nt_per_assembled_chunk;
this->telescoping_ringbuf_nchunks[i] = (telescoping_ringbuf_nsamples[i] + nt_c - 1) / nt_c;
this->telescoping_ringbuf_nchunks[i] = max(telescoping_ringbuf_nchunks[i], 2);
if ((l1_verbosity >= 2) && (telescoping_ringbuf_nsamples[i] != telescoping_ringbuf_nchunks[i] * nt_c)) {
cout << l1_config_filename << ": telescoping_ringbuf_nsamples[" << i << "] increased from "
<< telescoping_ringbuf_nsamples[i] << " to " << (telescoping_ringbuf_nchunks[i] * nt_c)
<< " (rounding up to multiple of ch_frb_io::nt_per_assembled_chunk)" << endl;
}
}
// Memory slab parameters (nbytes_per_memory_slab, total_memory_slabs)
//
// The total memory usage consists of
// - live_chunks_per_beam (active + assembled_ringbuf + telescoping_ringbuf)
// - temporary_chunks_per_stream (temporaries in assembled_chunk::_put_assembled_chunk())
// - total_staging_chunks (derived from config param 'write_staging_area_gb')
int nupfreq = xdiv(nfreq, nfreq_c);
this->nbytes_per_memory_slab = ch_frb_io::assembled_chunk::get_memory_slab_size(nupfreq, nt_per_packet, this->nrfifreq);
int live_chunks_per_beam = 2; // "active" chunks
live_chunks_per_beam += assembled_ringbuf_nchunks; // assembled_ringbuf
// telescoping_ringbuf
for (unsigned int i = 0; i < telescoping_ringbuf_nchunks.size(); i++)
live_chunks_per_beam += telescoping_ringbuf_nchunks[i];
// probably overkill, but this fudge factor accounts for the fact that the dedispersion
// thread can briefly hang on to a reference to the assembled_chunk.
int fudge_factor = 4;
if (telescoping_ringbuf_nchunks.size() > 0)
fudge_factor = max(4 - telescoping_ringbuf_nchunks[0], 0);
live_chunks_per_beam += fudge_factor;
int temporary_chunks_per_stream = max(1, (int)telescoping_ringbuf_nchunks.size());
int total_staging_chunks = pow(2,30.) * write_staging_area_gb / nbytes_per_memory_slab;
this->total_memory_slabs = nbeams * live_chunks_per_beam + nstreams * temporary_chunks_per_stream + total_staging_chunks;
if (l1_verbosity >= 1) {
double gb = total_memory_slabs * double(nbytes_per_memory_slab) / pow(2.,30.);
cout << "Total assembled_chunk memory on node: " << gb << " GB"
<< " (chunk counts: " << nbeams << "*" << live_chunks_per_beam
<< " + " << nstreams << "*" << temporary_chunks_per_stream
<< " + " << total_staging_chunks << ")" << endl;
}
// l1b_pipe_capacity
if (!tflag && (l1b_buffer_nsamples > 0)) {
int nt_chunk = bonsai_config.nt_chunk;
int nchunks = (l1b_buffer_nsamples + nt_chunk - 1) / nt_chunk;
if ((l1_verbosity >= 2) && (l1b_buffer_nsamples != nchunks * nt_chunk)) {
cout << l1_config_filename << ": increasing l1b_buffer_nsamples: "
<< l1b_buffer_nsamples << " -> " << (nchunks * nt_chunk)
<< " (rounding up to multiple of bonsai_nt_chunk)" << endl;
}
// Base capacity for config_params + a little extra for miscellaneous metadata...
this->l1b_pipe_capacity = bonsai_config.serialize_to_buffer().size() + 1024;
// ... plus capacity for coarse-grained triggers.
for (int itree = 0; itree < bonsai_config.ntrees; itree++)
this->l1b_pipe_capacity += nchunks * bonsai_config.ntriggers_per_chunk[itree] * sizeof(float);
if (l1_verbosity >= 2)
cout << l1_config_filename << ": l1b pipe_capacity will be " << l1b_pipe_capacity << " bytes" << endl;
}
// Warnings that can be overridden with -f.
bool have_warnings = false;
if (!p.check_for_unused_params(false)) // fatal=false
have_warnings = true;
if (nrfifreq == 0) {
cout << "Warning: nrfifreq was set to zero (or not set) -- RFI masks will not be saved in callback data!" << endl;
have_warnings = true;
}
if ((l1b_executable_filename.size() > 0) && (l1b_buffer_nsamples == 0) && (l1b_pipe_timeout <= 1.0e-6)) {
cout << l1_config_filename << ": should specify either l1b_buffer_nsamples > 0, or l1b_pipe_timeout > 0.0, see MANUAL.md for discussion." << endl;
have_warnings = true;
}
if (!tflag && (bonsai_config.nfreq > 4096) && slow_kernels) {
cout << l1_config_filename << ": nfreq > 4096 and slow_kernels=true, presumably unintentional?" << endl;
have_warnings = true;
}
if (have_warnings)
_have_warnings();
// I put this last, since it creates directories.
this->stream_filename_pattern = ch_frb_l1::acqname_to_filename_pattern(stream_devname, stream_acqname, vrange(1,nstreams+1), stream_beam_ids);
}
void l1_config::_have_warnings() const
{
if (this->fflag) {
cout << "ch-frb-l1: the above warning(s) will be ignored, since the -f flag was specified." << endl;
return;
}
cout << "ch-frb-l1: the above warning(s) are treated as fatal. To force the L1 server to run anyway, use ch-frb-l1 -f." << endl;
exit(1);
}
// -------------------------------------------------------------------------------------------------
//
// Dedispersion thread context and main().
//
// Note: the 'ibeam' argument is an index satisfying 0 <= ibeam < config.nbeams,
// where config.nbeams is the number of beams on the node. Not a beam_id!
//
// Note: the 'l1b_subprocess' argument can be an empty pointer (in the case
// where the L1 server is run without L1B).
struct dedispersion_thread_context {
l1_config config;
shared_ptr<ch_frb_io::intensity_network_stream> sp;
shared_ptr<mask_stats_map> ms_map;
std::function<void(int, shared_ptr<const bonsai::dedisperser>,
shared_ptr<const rf_pipelines::pipeline_object> latency_pre,
shared_ptr<const rf_pipelines::pipeline_object> latency_post
)> set_bonsai;
shared_ptr<bonsai::trigger_pipe> l1b_subprocess; // warning: can be empty pointer!
vector<int> allowed_cores;
bool asynchronous_dedispersion; // run RFI and dedispersion in separate threads?
int ibeam;
void _thread_main() const;
void _toy_thread_main() const; // if -t command-line argument is specified
// Helper function called in _thread_main(), during initialization
void _init_mask_counters(const shared_ptr<rf_pipelines::pipeline_object> &pipeline, int beam_id) const;
};
void dedispersion_thread_context::_init_mask_counters(const shared_ptr<rf_pipelines::pipeline_object> &pipeline, int beam_id) const
{
vector<shared_ptr<rf_pipelines::mask_counter_transform>> mask_counters;
bool clippers_after_mask_counters = false;
// This lambda-function is passed to rf_pipelines::visit_pipeline(),
// to find the mask_counters, and test whether clippers occur after mask_counters.
auto find_mask_counters = [&mask_counters, &clippers_after_mask_counters]
(const shared_ptr<rf_pipelines::pipeline_object> &p, int depth)
{
// A transform is considered to be a clipper if its class_name contains the substring "clipper".
// FIXME: "feels" like a hack, is there a better criterion?
bool is_clipper = (p->class_name.find("clipper") != string::npos);
if (is_clipper) {
if (mask_counters.size() > 0)
clippers_after_mask_counters = true;
return;
}
auto counter = dynamic_pointer_cast<rf_pipelines::mask_counter_transform> (p);
if (counter) {
// cout << "Found mask counter: " << counter->where << endl;
clippers_after_mask_counters = false;
mask_counters.push_back(counter);
}
};
rf_pipelines::visit_pipeline(find_mask_counters, pipeline);
if (config.nrfifreq > 0) {
if (mask_counters.size() == 0)
throw runtime_error("ch-frb-l1: RFI masks requested, but no mask_counters in the RFI config JSON file");
if (clippers_after_mask_counters)
throw runtime_error("ch-frb-l1: RFI masks requested, and clippers occur after the last mask_counter in the RFI config JSON file");
}
chlog("Setting up " << mask_counters.size() << " mask counters");
for (unsigned int i = 0; i < mask_counters.size(); i++) {
rf_pipelines::mask_counter_transform::runtime_attrs attrs;
attrs.ringbuf_nhistory = config.rfi_mask_meas_history;
attrs.chime_beam_id = beam_id;
// If RFI masks are requested, then the last mask_counter in the chain fills assembled_chunks.
if ((config.nrfifreq > 0) && ((i+1) == mask_counters.size()))
attrs.chime_stream = this->sp;
mask_counters[i]->set_runtime_attrs(attrs);
this->ms_map->put(beam_id, mask_counters[i]->where, mask_counters[i]->ringbuf);
}
}
// Note: only called if config.tflag == false.
void dedispersion_thread_context::_thread_main() const
{
assert(!config.tflag);
assert(ibeam >= 0 && ibeam < config.nbeams);
// Pin thread before allocating anything.
ch_frb_io::pin_thread_to_cores(allowed_cores);
// Note: deep copy here, to get thread-local copy of transfer matrices!
bonsai::config_params bonsai_config = config.bonsai_config.deep_copy();
// Note: the distinction between 'ibeam' and 'beam_id' is a possible source of bugs!
int beam_id = config.beam_ids[ibeam];
auto stream = rf_pipelines::make_chime_network_stream(sp, beam_id, config.intensity_prescale);
auto rfi_chain = rf_pipelines::pipeline_object::from_json(config.rfi_transform_chain_json);
bonsai::dedisperser::initializer ini_params;
ini_params.fill_rfi_mask = true; // very important for real-time analysis!
ini_params.analytic_variance_on_the_fly = false; // prevent accidental initialization from non-hdf5 config file (should have been checked already, but another check can't hurt)
ini_params.allocate = true; // redundant, but I like making it explicit
ini_params.verbosity = 0;
if (asynchronous_dedispersion) {
ini_params.asynchronous = true;
// The following line is now commented out.
//
// ini_params.async_allowed_cores = allowed_cores;
//
// Previously, I was including this, even though it should be redundant given the call to pin_thread_to_cores() above.
// However, it appears that this is not safe, since std::hardware_concurreny() sometimes returns 1 after the first call
// to pin_thread_to_cores(). It is very strange that this only happens sometimes! But commenting out the line above
// seems to fix it.
}
auto dedisperser = make_shared<bonsai::dedisperser> (bonsai_config, ini_params); // not config.bonsai_config
// Trigger processors.
shared_ptr<bonsai::global_max_tracker> max_tracker;
if (config.track_global_trigger_max) {
max_tracker = make_shared<bonsai::global_max_tracker> ();
dedisperser->add_processor(max_tracker);
}
if (l1b_subprocess)
dedisperser->add_processor(l1b_subprocess);
auto bonsai_transform = rf_pipelines::make_bonsai_dedisperser(dedisperser);
_init_mask_counters(rfi_chain, beam_id);
auto pipeline = make_shared<rf_pipelines::pipeline> ();
pipeline->add(stream);
pipeline->add(rfi_chain);
pipeline->add(bonsai_transform);
shared_ptr<rf_pipelines::pipeline_object> latency1 = stream;
shared_ptr<rf_pipelines::pipeline_object> latency2;
auto find_last_transform = [&latency2]
(const shared_ptr<rf_pipelines::pipeline_object> &p, int depth) {
latency2 = p;
};
rf_pipelines::visit_pipeline(find_last_transform, rfi_chain);
cout << "RFI chain:" << endl;
rf_pipelines::print_pipeline(rfi_chain);
cout << "Found first stage for latency monitoring: " << latency1->name << endl;
cout << "Found last stage for latency monitoring: " << latency2->name << endl;
set_bonsai(ibeam, dedisperser, latency1, latency2);
rf_pipelines::run_params rparams;
rparams.outdir = ""; // disables
rparams.verbosity = 0;
// FIXME more sensible synchronization scheme!
pipeline->run(rparams);
if (max_tracker) {
stringstream ss;
ss << "ch-frb-l1: beam_id=" << beam_id
<< ": most significant FRB has SNR=" << max_tracker->global_max_trigger
<< ", and (dm,arrival_time)=(" << max_tracker->global_max_trigger_dm
<< "," << max_tracker->global_max_trigger_arrival_time
<< ")\n";
cout << ss.str().c_str() << flush;
}
// FIXME is it necessary for the dedispersion thread to wait for L1B?
// If not, it would be clearer to move this into l1_server::join_all_threads().
if (l1b_subprocess) {
int l1b_status = l1b_subprocess->wait_for_child();
if (config.l1_verbosity >= 1)
cout << "l1b process exited with status " << l1b_status << endl;
}
}
// Note: Called if config.tflag == false.
void dedispersion_thread_context::_toy_thread_main() const
{
assert(config.tflag);
assert(ibeam >= 0 && ibeam < config.nbeams);
ch_frb_io::pin_thread_to_cores(allowed_cores);
// FIXME: beam_id stuff is more confusing than it needs to be! This will be simplified soon.
int nbeams = config.nbeams;
int nstreams = config.nstreams;
int nbeams_per_stream = xdiv(nbeams, nstreams);
int ibeam_within_stream = ibeam % nbeams_per_stream;
// FIXME: stream-starting stuff also needs cleanup.
sp->start_stream();
for (;;) {
auto chunk = sp->get_assembled_chunk(ibeam_within_stream);
// Some voodoo to reduce interleaved output
usleep(ibeam * 10000);
if (!chunk) {
cout << (" [beam" + to_string(ibeam) + "]: got NULL chunk, exiting\n");
return;
}
if (ibeam_within_stream == 0) {
auto event_counts = sp->get_event_counts();
stringstream ss;
ss << sp->ini_params.ipaddr << ":" << sp->ini_params.udp_port << ": "
<< " nrecv=" << event_counts[ch_frb_io::intensity_network_stream::packet_received]
<< ", ngood=" << event_counts[ch_frb_io::intensity_network_stream::packet_good]
<< ", nbad=" << event_counts[ch_frb_io::intensity_network_stream::packet_bad]
<< ", ndropped=" << event_counts[ch_frb_io::intensity_network_stream::packet_dropped]
<< ", ahit=" << event_counts[ch_frb_io::intensity_network_stream::assembler_hit]
<< ", amiss=" << event_counts[ch_frb_io::intensity_network_stream::assembler_miss]
<< "\n";
string s = ss.str();
cout << s.c_str();
}
if (config.l1_verbosity >= 2)
cout << (" [beam" + to_string(ibeam) + "]: read chunk " + to_string(chunk->ichunk) + "\n");
//chunk->decode(intensity, weights, istride, wstride);
//chunk.reset();
}
}
static void dedispersion_thread_main(const dedispersion_thread_context &context)
{
try {
if (context.config.tflag)
context._toy_thread_main();
else
context._thread_main();
}
catch (exception &e) {
cerr << e.what() << "\n";
throw;
}
}