-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathchime_packetizer.cpp
137 lines (100 loc) · 4.97 KB
/
chime_packetizer.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include <sstream>
#include "rf_pipelines_internals.hpp"
#ifdef HAVE_CH_FRB_IO
#include <ch_frb_io.hpp>
#endif
using namespace std;
namespace rf_pipelines {
#if 0
}; // pacify emacs c-mode
#endif
#ifndef HAVE_CH_FRB_IO
shared_ptr<wi_transform> make_chime_packetizer(const string &dstname, int nfreq_per_packet, int nt_per_chunk, int nt_per_packet, float wt_cutoff, double target_gbps, int beam_id)
{
throw runtime_error("rf_pipelines::make_chime_packetizer() was called, but rf_pipelines was compiled without ch_frb_io");
}
#else // HAVE_CH_FRB_IO
struct chime_packetizer : public wi_transform {
ch_frb_io::intensity_network_ostream::initializer ini_params;
std::shared_ptr<ch_frb_io::intensity_network_ostream> ostream;
chime_packetizer(const std::string &dstname, int nfreq_per_packet, int nt_per_chunk, int nt_per_packet, float wt_cutoff, double target_gbps, int beam_id=0);
virtual void _bind_transform(Json::Value &json_attrs) override;
virtual void _start_pipeline(Json::Value &json_attrs) override;
virtual void _process_chunk(float *intensity, ssize_t istride, float *weights, ssize_t wstride, ssize_t pos) override;
virtual void _end_pipeline(Json::Value &json_output) override;
virtual void _reset() override;
};
chime_packetizer::chime_packetizer(const string &dstname, int nfreq_coarse_per_packet, int nt_per_chunk, int nt_per_packet, float wt_cutoff, double target_gbps, int beam_id) :
wi_transform("chime_packetizer")
{
// Argument checking
constexpr int nfreq_coarse = ch_frb_io::constants::nfreq_coarse_tot;
if ((nfreq_coarse_per_packet <= 0) || (nfreq_coarse % nfreq_coarse_per_packet))
throw runtime_error("chime_packetizer: currently nfreq_coarse_per_packet must be a divisor of " + to_string(nfreq_coarse));
if (nt_per_chunk <= 0)
throw runtime_error("chime_packetizer: nt_per_chunk > 0 is required");
if (nt_per_packet <= 0)
throw runtime_error("chime_packetizer: nt_per_packet > 0 is required");
if (nt_per_chunk % nt_per_packet)
throw runtime_error("chime_packetizer: nt_per_chunk must be a multiple of nt_per_packet");
// Initialize ini_params (some initializations deferred to _bind_transform())
this->ini_params.beam_ids = { beam_id };
this->ini_params.dstname = dstname;
this->ini_params.nfreq_coarse_per_packet = nfreq_coarse_per_packet;
this->ini_params.nt_per_chunk = nt_per_chunk;
this->ini_params.nt_per_packet = nt_per_packet;
this->ini_params.wt_cutoff = wt_cutoff;
this->ini_params.target_gbps = target_gbps;
this->ini_params.coarse_freq_ids.resize(nfreq_coarse, -1);
for (int i = 0; i < nfreq_coarse; i++)
this->ini_params.coarse_freq_ids[i] = i;
// Initialize base class members.
this->nt_chunk = nt_per_chunk;
}
// Called after (nfreq, nds) are initialized.
void chime_packetizer::_bind_transform(Json::Value &json_attrs)
{
constexpr int nfreq_coarse = ch_frb_io::constants::nfreq_coarse_tot;
constexpr double seconds_per_fpga_count = ch_frb_io::constants::dt_fpga;
if (nfreq % nfreq_coarse)
throw runtime_error("chime_packetizer: currently nfreq must be a multiple of " + to_string(nfreq_coarse) + " (see comment in rf_pipelines.hpp)");
if (!json_attrs.isMember("dt_sample"))
throw runtime_error("chime_packetizer: expected json_attrs to contain member 'dt_sample'");
this->ini_params.nupfreq = xdiv(nfreq, nfreq_coarse);
// infer fpga_counts_per_sample from dt_sample
double dt_sample = json_attrs["dt_sample"].asDouble();
double f = dt_sample / seconds_per_fpga_count;
this->ini_params.fpga_counts_per_sample = int(f+0.5); // round to nearest integer
if (fabs(f - ini_params.fpga_counts_per_sample) > 0.01) {
// We use a stringstream here, since to_string() gives a weird formatting
stringstream ss;
ss << "chime_packetizer: currently dt_sample must be a multiple of " << seconds_per_fpga_count << " seconds (see comment in rf_pipelines.hpp)";
throw runtime_error(ss.str());
}
// FIXME it would be nice to have an ini_params.check_validity() method to call here.
}
void chime_packetizer::_start_pipeline(Json::Value &json_attrs)
{
this->ostream = ch_frb_io::intensity_network_ostream::make(ini_params);
}
void chime_packetizer::_process_chunk(float *intensity, ssize_t istride, float *weights, ssize_t wstride, ssize_t pos)
{
this->ostream->send_chunk(intensity, istride, weights, wstride, pos * ini_params.fpga_counts_per_sample);
}
void chime_packetizer::_end_pipeline(Json::Value &json_output)
{
this->_reset();
}
void chime_packetizer::_reset()
{
if (ostream) {
ostream->end_stream(true); // join_network_thread=true
ostream.reset();
}
}
shared_ptr<wi_transform> make_chime_packetizer(const string &dstname, int nfreq_per_packet, int nt_per_chunk, int nt_per_packet, float wt_cutoff, double target_gbps, int beam_id)
{
return make_shared<chime_packetizer> (dstname, nfreq_per_packet, nt_per_chunk, nt_per_packet, wt_cutoff, target_gbps, beam_id);
}
#endif // HAVE_CH_FRB_IO
} // namespace rf_pipelines