Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add L1 data injection transform #26

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f887b17
steal uint64_t_from_json function from rfimask branch
dstndstn Oct 18, 2018
d4dcd95
add injector class
dstndstn Oct 18, 2018
5f88902
mutex-protect the vector of data to inject; track the FPGAcount for t…
dstndstn Oct 18, 2018
e9a84de
injector: actually add injected data
dstndstn Oct 20, 2018
7907ca0
remove data to inject after injected; report number of frequencies be…
dstndstn Oct 20, 2018
fff433a
injected data: use offset in samples, not fpga counts
dstndstn Oct 21, 2018
c456a31
Merge branch 'kms_rfimask' into inj+rfi
dstndstn Oct 21, 2018
7a4439e
fixes
dstndstn Oct 21, 2018
b07b5a6
add transform that writes out assembled_chunks in msgpack format
dstndstn Oct 21, 2018
b42c40e
Merge branch 'master' into dstn-injection
dstndstn Nov 13, 2018
abe5f62
oops, fix merge mistakes
dstndstn Nov 13, 2018
86e692b
remove duplicate json uint64 definition
dstndstn Nov 13, 2018
1ab5c17
remove excessive chat
dstndstn Nov 27, 2018
f751d02
assembled-chunk-write: store ch-frb-io chunk indices, not rf_pipeline…
dstndstn Nov 27, 2018
4b184cf
rename injector to intensity_injector
dstndstn Nov 27, 2018
c6a06e5
hold the mutex for less time; delete inject_data requests whose time …
dstndstn Nov 27, 2018
5b525bf
rename class from injector to intensity_injector
dstndstn Nov 27, 2018
ab15498
remove last_fpgacount_processed
dstndstn Nov 27, 2018
eec8b45
remove FPGA counts from intensity_injector
dstndstn Nov 27, 2018
f2cb914
move inject_data checking here, vs l1-rpc
dstndstn Nov 28, 2018
7ed1cf4
quiet logging; fix int type warnings
dstndstn Nov 28, 2018
9a2641f
quiet
dstndstn Jan 18, 2019
c0aee13
add comment about make_chime_assembled_chunk_file_writer's filename_p…
dstndstn Jan 18, 2019
a40d448
first try at class pipeline_spool (untested)
kmsmith137 Jan 21, 2019
d0194b1
test-core-pipeline-logic.cpp: use pipeline_spool instead of morally e…
kmsmith137 Jan 21, 2019
4e8e781
just update a comment
kmsmith137 Jan 21, 2019
9099d30
add test-injection code
dstndstn Jan 23, 2019
ae1669f
test-injection: compute the expected spooled data and compare
dstndstn Jan 24, 2019
0aff0b4
test-injector: handle nds
dstndstn Jan 24, 2019
f14998e
Merge branch 'master' into dstn-injection
dstndstn Jan 24, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,32 @@ OFILES = badchannel_mask.o \
chime_file_stream.o \
chime_file_stream_base.o \
chime_file_writer.o \
chime_frb_file_stream.o \
chime_assembled_chunk_file_writer.o \
chime_frb_file_stream.o \
chime_network_stream.o \
chime_packetizer.o \
chunked_pipeline_object.o \
file_utils.o \
gaussian_noise_stream.o \
intensity_clippers.o \
intensity_injector.o \
json_utils.o \
lexical_cast.o \
mask_counter.o \
mask_expander.o \
mask_measurements_ringbuf.o \
outdir_manager.o \
pipeline.o \
pipeline_fork.o \
pipeline_object.o \
pipeline_spool.o \
plot_utils.o \
polynomial_detrenders.o \
ring_buffer.o \
run_params.o \
spectrum_analyzer.o \
spline_detrenders.o \
std_dev_clippers.o \
mask_counter.o \
mask_measurements_ringbuf.o \
wi_sub_pipeline.o \
wi_stream.o \
wi_transform.o \
Expand Down Expand Up @@ -78,7 +81,7 @@ PYFILES=rf_pipelines/rf_pipelines_c.so \
BINFILES = rfp-time

# C++ unit test binaries which are not installed in $(BINDIR).
TESTBINFILES = test-misc test-ring-buffer test-core-pipeline-logic test-file-stream-base
TESTBINFILES = test-misc test-ring-buffer test-core-pipeline-logic test-file-stream-base test-injection

# Not actually a unit test, but Makefile doesn't need to distinguish
TESTBINFILES += visit-pipeline-example
Expand Down Expand Up @@ -213,3 +216,7 @@ test-file-stream-base: test-file-stream-base.o $(OFILES)

visit-pipeline-example: visit-pipeline-example.o $(OFILES)
$(CPP) $(CPP_LFLAGS) -o $@ $^ $(LIBS)

test-injection: test-injection.o $(OFILES)
$(CPP) $(CPP_LFLAGS) -o $@ $^ $(LIBS)

141 changes: 141 additions & 0 deletions chime_assembled_chunk_file_writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#include <algorithm>
#include "rf_pipelines_internals.hpp"

#ifdef HAVE_CH_FRB_IO
#include <ch_frb_io.hpp>
#endif

using namespace std;
using namespace ch_frb_io;

namespace rf_pipelines {
#if 0
}; // pacify emacs c-mode!
#endif


#ifndef HAVE_CH_FRB_IO

shared_ptr<wi_transform> make_chime_assembled_chunk_file_writer(const string &filename, bool clobber)
{
throw runtime_error("rf_pipelines::make_chime_assembled_chunk_file_writer() was called, but rf_pipelines was compiled without ch_frb_io");
}

#else // HAVE_CH_FRB_IO

struct chime_assembled_chunk_file_writer : public wi_transform {
// Constructor args
const string filename;
const bool clobber;

// Stream params (not available until set_stream() gets called)
assembled_chunk::initializer chunk_ini;
int ichunk_offset = 0;

chime_assembled_chunk_file_writer(const string &filename_, bool clobber_) :
wi_transform("chime_assembled_chunk_file_writer"),
filename(filename_),
clobber(clobber_)
{
this->name = "chime_assembled_chunk_file_writer(" + filename + ")";
this->nt_chunk = constants::nt_per_assembled_chunk;
}

virtual void _start_pipeline(Json::Value &j) override
{
// FIXME --
chunk_ini.beam_id = 0;
chunk_ini.nupfreq = this->nfreq / constants::nfreq_coarse_tot;
chunk_ini.nrfifreq = 0; // ??
chunk_ini.nt_per_packet = 16; // ??
chunk_ini.fpga_counts_per_sample = j["fpga_counts_per_sample"].asInt();
chunk_ini.frame0_nano = j["frame0_nano"].asUInt64();
// This converts ch_frb_io chunk number (which scales directly
// to FPGA counts) to rf_pipelines chunk counts (which start
// from 0 at the beginning of the stream).
this->ichunk_offset = j["initial_fpga_count"].asUInt64() / (uint64_t)(chunk_ini.fpga_counts_per_sample * constants::nt_per_assembled_chunk);
}

virtual void _process_chunk(float *intensity, ssize_t istride, float *weights, ssize_t wstride, ssize_t pos) override
{
chunk_ini.ichunk = this->ichunk_offset + pos / constants::nt_per_assembled_chunk;
shared_ptr<assembled_chunk> ch = assembled_chunk::make(chunk_ini);
float ilo = 1e9;
float ihi = -1e9;
int nbad = 0;
for (int f=0; f<this->nfreq; f++) {
for (int t=0; t<this->nt_chunk; t++) {
if (weights[f*istride + t] == 0) {
nbad++;
continue;
}
float ival = intensity[f*istride + t];
if (ival < ilo)
ilo = ival;
if (ival > ihi)
ihi = ival;
}
}
// Set scale and offset to fill 2 to 252.
// intensity = scale * (8-bit value) + offset
float scale = (ihi - ilo) / 250.;
float offset = ilo - 2.*scale;
for (int i=0; i<ch->nscales; i++) {
ch->scales[i] = scale;
ch->offsets[i] = offset;
}
int i = 0;
for (int f=0; f<this->nfreq; f++) {
for (int t=0; t<this->nt_chunk; t++) {
uint8_t val;
if (weights[f*istride + t] == 0)
val = 0;
else
val = uint8_t((intensity[f*istride + t] - offset) / scale);
ch->data[i] = val;
i++;
}
}
string thisfn = ch->format_filename(filename);
if (!clobber && file_exists(thisfn))
throw runtime_error(thisfn + ": file already exists and clobber=false was specified in the the chime_assembled_chunk_file_writer constructor");
ch->write_msgpack_file(thisfn, false);
}

virtual Json::Value jsonize() const override
{
Json::Value ret;
ret["class_name"] = "chime_assembled_chunk_file_writer";
ret["filename"] = filename;
ret["clobber"] = clobber;
return ret;
}

static shared_ptr<chime_assembled_chunk_file_writer> from_json(const Json::Value &j)
{
string filename = string_from_json(j, "filename");
bool clobber = bool_from_json(j, "clobber");
return make_shared<chime_assembled_chunk_file_writer> (filename, clobber);
}
};


namespace {
struct _init {
_init() {
pipeline_object::register_json_deserializer("chime_assembled_chunk_file_writer", chime_assembled_chunk_file_writer::from_json);
}
} init;
}


// See rf_pipelines.hpp for an explanation of the arguments
shared_ptr<wi_transform> make_chime_assembled_chunk_file_writer(const string &filename, bool clobber)
{
return make_shared<chime_assembled_chunk_file_writer> (filename, clobber);
}

#endif // HAVE_CH_FRB_IO


} // namespace rf_pipelines
187 changes: 187 additions & 0 deletions intensity_injector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#include <algorithm>

#include "rf_pipelines_internals.hpp"
#include "rf_pipelines_inventory.hpp"

using namespace std;

namespace rf_pipelines {
#if 0
}; // pacify emacs c-mode
#endif

typedef lock_guard<mutex> ulock;

string inject_data::check(int nfreq) {
if (nfreq == 0)
return "intensity_injector: nfreq=0. This probably means you tried to inject data before the pipeline has been bound.";
if (this->mode != 0)
return "intensity_injector: mode=" + to_string(this->mode) + " but only mode=0 is known";
if ((int)this->sample_offset.size() != nfreq)
return "intensity_injector: sample_offset array has size " + to_string(this->sample_offset.size()) + ", expected nfreq=" + to_string(nfreq);
if ((int)this->ndata.size() != nfreq)
return "intensity_injector: ndata array has size " + to_string(this->ndata.size()) + ", expected nfreq=" + to_string(nfreq);
size_t nd = 0;
for (uint16_t n : this->ndata)
nd += n;
if (this->data.size() != nd)
return "intensity_injector: data array has size " + to_string(this->data.size()) + ", expected sum(ndata)=" + to_string(nd);
return "";
}


intensity_injector::intensity_injector(int nt_chunk) :
wi_transform("intensity_injector") {
this->nt_chunk = nt_chunk;
}

void intensity_injector::_bind_transform(Json::Value &json_attrs)
{
// Should be redundant with asserts elsewhere in rf_pipelines, but just being paranoid!
rf_assert(this->nds == 1);
}

void intensity_injector::_start_pipeline(Json::Value &json_attrs) {}

void intensity_injector::inject(shared_ptr<inject_data> inj) {
// Check input
string err = inj->check(nfreq);
if (err.size())
throw runtime_error(err);

// Compute range of sample values
int min_offset = inj->sample_offset[0];
int max_offset = inj->sample_offset[0] + inj->ndata[0];
for (size_t i=0; i<inj->sample_offset.size(); i++) {
min_offset = std::min(min_offset, inj->sample_offset[i]);
max_offset = std::max(max_offset, inj->sample_offset[i] + inj->ndata[i]);
}
inj->min_offset = min_offset;
inj->max_offset = max_offset;

if (inj->min_offset + inj->sample0 < pos_lo)
throw runtime_error("intensity_injector: data to inject is already in the past! Inject sample " + to_string(inj->min_offset + inj->sample0) + " vs now " + to_string(pos_lo));

ulock u(mutex);
to_inject.push_back(inj);
}

void intensity_injector::_process_chunk(float *intensity, ssize_t istride, float *weights, ssize_t wstride, ssize_t pos)
{
// Reminder: previous asserts have already checked that
// this->nds == 1

// Recall that the 'pos' argument is the current pipeline position
// in units of time samples (starts from 0 for first chunk
// processed by this rf_pipeline)

vector<shared_ptr<inject_data> > to_inject_now;
{
ulock u(mutex);
//cout << "Intensity_Injector transform: " << to_inject.size() << " chunks of data" << endl;
for (size_t idata=0; idata<to_inject.size(); idata++) {
auto data = to_inject[idata];
//cout << " Pipeline pos: " << pos << ", data injection sample0: " << data->sample0 << ", sample range " << data->sample0 + data->min_offset << " to " << data->sample0 + data->max_offset << endl;
// Index in the current chunk of data of "fpga0" of this injected data entry
if ((data->sample0 + data->max_offset) < pos) {
// This inject_data request's time has passed.
//cout << " This data injection's time has passed. Deleting" << endl;
to_inject.erase(to_inject.begin() + idata);
idata--;
continue;
}
if ((data->sample0 + data->min_offset) >= (pos + this->nt_chunk)) {
// This inject_data request is in the future.
//cout << " This data injection is in the future." << endl;
continue;
}
//cout << " Will inject!" << endl;
to_inject_now.push_back(data);
}
}
for (const auto &data : to_inject_now) {
// About int sizes here: data->sample0 may be large, as may
// pos (if we run for a long time).

// Index in this chunk of offset zero in the inject_data request;
// may be positive or negative.
ssize_t sample0 = data->sample0 - pos;
int nf = 0;
int ntotal = 0;
int nbefore = 0;
int nafter = 0;
// offset into the "data" array
ssize_t data_offset = 0;
for (size_t i=0; i<data->sample_offset.size(); i++) {
// save this index in case we need it below
ssize_t this_data_offset = data_offset;
// skip this frequency's data regardless of whether we use them
data_offset += data->ndata[i];
// sample start,end for this frequency
ssize_t f0 = sample0 + data->sample_offset[i];
ssize_t f1 = f0 + data->ndata[i];
if (f0 >= nt_chunk) {
// This frequency's data is after this chunk
nafter++;
continue;
}
if (f1 <= 0) {
// This frequency's data is before this chunk
nbefore++;
continue;
}
// Now select the subset of this frequency's data that overlaps
// this chunk. f0 and f1 are indices into this chunk of data.
int inj_offset = 0;
int ncopy = data->ndata[i];
if (f0 < 0) {
// we're injecting the tail end of this frequency's array
ncopy -= (-f0);
inj_offset += (-f0);
// it will start at the beginning of this chunk
f0 = 0;
}
if (f0 + ncopy > nt_chunk) {
ncopy = nt_chunk - f0;
// a tail of data remains
}
float* indata = intensity + i*istride + f0;
for (int j=0; j<ncopy; j++)
indata[j] += data->data[this_data_offset + inj_offset + j];
nf += 1;
ntotal += ncopy;
}
//cout << "Injected " << nf << " frequency bins, total of " << ntotal << " samples. N freq before: " << nbefore << ", after: " << nafter << endl;
}
}

Json::Value intensity_injector::jsonize() const
{
Json::Value ret;
ret["class_name"] = "intensity_injector";
ret["nt_chunk"] = int(nt_chunk);
return ret;
}

shared_ptr<intensity_injector>
intensity_injector::from_json(const Json::Value &j)
{
ssize_t nt_chunk = ssize_t_from_json(j, "nt_chunk");
return make_shared<intensity_injector>(nt_chunk);
}

namespace {
struct _init {
_init() {
pipeline_object::register_json_deserializer("intensity_injector", intensity_injector::from_json);
}
} init;
}

// Externally callable
shared_ptr<intensity_injector> make_intensity_injector(int nt_chunk) {
return make_shared<intensity_injector>(nt_chunk);
}

} // namespace rf_pipelines

2 changes: 0 additions & 2 deletions json_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ ssize_t ssize_t_from_json(const Json::Value &j, const string &k)
return v.asInt64();
}


uint64_t uint64_t_from_json(const Json::Value &j, const string &k)
{
const Json::Value &v = get_member(j, k);
Expand All @@ -125,7 +124,6 @@ uint64_t uint64_t_from_json(const Json::Value &j, const string &k)
return v.asUInt64();
}


double double_from_json(const Json::Value &j, const string &k)
{
const Json::Value &v = get_member(j, k);
Expand Down
Loading