-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpipeline_fork.cpp
131 lines (97 loc) · 3.22 KB
/
pipeline_fork.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#include "rf_pipelines_internals.hpp"
using namespace std;
namespace rf_pipelines {
#if 0
}; // pacify emacs c-mode
#endif
struct pipeline_fork : public pipeline_object
{
struct element {
// Initialized in constructor.
string input_bufname;
string output_bufname;
// Initialized in bind().
shared_ptr<ring_buffer> input_buffer;
shared_ptr<ring_buffer> output_buffer;
ssize_t csize = 0;
};
vector<element> elements;
pipeline_fork(const vector<pair<string,string>> &bufnames) :
pipeline_object("pipeline_fork")
{
unordered_set<string> all_names;
for (const auto &p: bufnames) {
for (const string &s: { p.first, p.second }) {
if (s.size() == 0)
_throw("empty bufname string was specified");
if (all_names.count(s) > 0)
_throw("duplicate bufname '" + s + "' was specified");
}
element e;
e.input_bufname = p.first;
e.output_bufname = p.second;
this->elements.push_back(e);
}
}
virtual void _bind(ring_buffer_dict &rb_dict, Json::Value &json_attrs) override
{
this->nt_chunk_out = nt_chunk_in;
this->nt_contig = nt_chunk_in;
this->nt_maxgap = 0;
for (element &e: this->elements) {
e.input_buffer = this->get_buffer(rb_dict, e.input_bufname);
e.output_buffer = this->create_buffer(rb_dict, e.output_bufname, e.input_buffer->cdims, e.input_buffer->nds);
e.csize = prod(e.input_buffer->cdims);
}
}
virtual ssize_t _advance() override
{
for (element &e: this->elements) {
ring_buffer_subarray src(e.input_buffer, pos_lo, pos_hi, ring_buffer::ACCESS_READ);
ring_buffer_subarray dst(e.output_buffer, pos_lo, pos_hi, ring_buffer::ACCESS_APPEND);
for (ssize_t i = 0; i < e.csize; i++)
memcpy(dst.data + i*dst.stride, src.data + i*src.stride, (pos_hi - pos_lo) * sizeof(float));
}
this->pos_lo = pos_hi.load();
return SSIZE_MAX;
}
virtual Json::Value jsonize() const override
{
Json::Value ret;
ret["class_name"] = "pipeline_fork";
ret["bufnames"] = Json::Value(Json::arrayValue);
for (const element &e: this->elements) {
Json::Value je(Json::arrayValue);
je.append(e.input_bufname);
je.append(e.output_bufname);
ret["bufnames"].append(je);
}
return ret;
}
static shared_ptr<pipeline_fork> from_json(const Json::Value &j)
{
const Json::Value &jb = array_from_json(j, "bufnames");
vector<pair<string,string>> bufnames;
for (int i = 0; i < int(jb.size()); i++) {
if (!jb[i].isArray() || (jb[i].size() != 2) || !jb[i][0].isString() || !jb[i][1].isString())
throw runtime_error("pipeline_fork::from_json: expected each element of 'bufnames' array to be a pair of strings");
string input_bufname = jb[i][0].asString();
string output_bufname = jb[i][1].asString();
bufnames.push_back(pair<string,string> (input_bufname, output_bufname));
}
return make_shared<pipeline_fork> (bufnames);
}
};
namespace {
struct _init {
_init() {
pipeline_object::register_json_deserializer("pipeline_fork", pipeline_fork::from_json);
}
} init;
}
// Externally callable factory function
shared_ptr<pipeline_object> make_pipeline_fork(const vector<pair<string,string>> &bufnames)
{
return make_shared<pipeline_fork> (bufnames);
}
} // namespace rf_pipelines