-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathringbuf.js
163 lines (138 loc) · 5.36 KB
/
ringbuf.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
//warning this is the same as in ringbuf, but packaged differently - needs fixing!!!
export default class RingBuffer {
static getStorageForCapacity(capacity, type) {
if (!type.BYTES_PER_ELEMENT) {
throw 'Pass in a ArrayBuffer subclass';
}
var bytes = 8 + (capacity + 1) * type.BYTES_PER_ELEMENT;
return new SharedArrayBuffer(bytes);
}
// `sab` is a SharedArrayBuffer with a capacity calculated by calling
// `getStorageForCapacity` with the desired capacity.
constructor(sab, type) {
if (!ArrayBuffer.__proto__.isPrototypeOf(type) &&
type.BYTES_PER_ELEMENT !== undefined) {
throw 'Pass a concrete typed array class as second argument';
}
// Maximum usable size is 1<<32 - type.BYTES_PER_ELEMENT bytes in the ring
// buffer for this version, easily changeable.
// -4 for the write ptr (uint32_t offsets)
// -4 for the read ptr (uint32_t offsets)
// capacity counts the empty slot to distinguish between full and empty.
this._type = type;
this.capacity = (sab.byteLength - 8) / type.BYTES_PER_ELEMENT;
this.buf = sab;
this.write_ptr = new Uint32Array(this.buf, 0, 1);
this.read_ptr = new Uint32Array(this.buf, 4, 1);
this.storage = new type(this.buf, 8, this.capacity);
}
// Returns the type of the underlying ArrayBuffer for this RingBuffer. This
// allows implementing crude type checking.
type() {
return this._type.name;
}
// Push bytes to the ring buffer. `bytes` is an typed array of the same type
// as passed in the ctor, to be written to the queue.
// Returns the number of elements written to the queue.
push(elements) {
var rd = Atomics.load(this.read_ptr, 0);
var wr = Atomics.load(this.write_ptr, 0);
if ((wr + 1) % this._storage_capacity() == rd) {
// full
return 0;
}
let to_write = Math.min(this._available_write(rd, wr), elements.length);
let first_part = Math.min(this._storage_capacity() - wr, to_write);
let second_part = to_write - first_part;
this._copy(elements, 0, this.storage, wr, first_part);
this._copy(elements, first_part, this.storage, 0, second_part);
// publish the enqueued data to the other side
Atomics.store(
this.write_ptr,
0,
(wr + to_write) % this._storage_capacity()
);
return to_write;
}
// Read `elements.length` elements from the ring buffer. `elements` is a typed
// array of the same type as passed in the ctor.
// Returns the number of elements read from the queue, they are placed at the
// beginning of the array passed as parameter.
pop(elements) {
var rd = Atomics.load(this.read_ptr, 0);
var wr = Atomics.load(this.write_ptr, 0);
if (wr == rd) {
return 0;
}
let to_read = Math.min(this._available_read(rd, wr), elements.length);
let first_part = Math.min(this._storage_capacity() - rd, elements.length);
let second_part = to_read - first_part;
this._copy(this.storage, rd, elements, 0, first_part);
this._copy(this.storage, 0, elements, first_part, second_part);
Atomics.store(this.read_ptr, 0, (rd + to_read) % this._storage_capacity());
return to_read;
}
// True if the ring buffer is empty false otherwise. This can be late on the
// reader side: it can return true even if something has just been pushed.
empty() {
var rd = Atomics.load(this.read_ptr, 0);
var wr = Atomics.load(this.write_ptr, 0);
return wr == rd;
}
// True if the ring buffer is full, false otherwise. This can be late on the
// write side: it can return true when something has just been poped.
full() {
var rd = Atomics.load(this.read_ptr, 0);
var wr = Atomics.load(this.write_ptr, 0);
return (wr + 1) % this.capacity != rd;
}
// The usable capacity for the ring buffer: the number of elements that can be
// stored.
capacity() {
return this.capacity - 1;
}
// Number of elements available for reading. This can be late, and report less
// elements that is actually in the queue, when something has just been
// enqueued.
available_read() {
var rd = Atomics.load(this.read_ptr, 0);
var wr = Atomics.load(this.write_ptr, 0);
return this._available_read(rd, wr);
}
// Number of elements available for writing. This can be late, and report less
// elements that is actually available for writing, when something has just
// been dequeued.
available_write() {
var rd = Atomics.load(this.read_ptr, 0);
var wr = Atomics.load(this.write_ptr, 0);
return this._available_write(rd, wr);
}
// private methods //
// Number of elements available for reading, given a read and write pointer..
_available_read(rd, wr) {
if (wr > rd) {
return wr - rd;
} else {
return wr + this._storage_capacity() - rd;
}
}
// Number of elements available from writing, given a read and write pointer.
_available_write(rd, wr) {
let rv = rd - wr - 1;
if (wr >= rd) {
rv += this._storage_capacity();
}
return rv;
}
// The size of the storage for elements not accounting the space for the index.
_storage_capacity() {
return this.capacity;
}
// Copy `size` elements from `input`, starting at offset `offset_input`, to
// `output`, starting at offset `offset_output`.
_copy(input, offset_input, output, offset_output, size) {
for (var i = 0; i < size; i++) {
output[offset_output + i] = input[offset_input + i];
}
}
}