|
1 | 1 | 'use strict'; |
2 | 2 |
|
3 | | -const assert = require('assert'); |
4 | | -const util = require('util'); |
5 | | -const { Socket } = require('net'); |
6 | | -const { JSStream } = process.binding('js_stream'); |
7 | | -const uv = process.binding('uv'); |
8 | | -const debug = util.debuglog('stream_wrap'); |
9 | | -const errors = require('internal/errors'); |
10 | | - |
11 | | -function StreamWrap(stream) { |
12 | | - const handle = new JSStream(); |
13 | | - |
14 | | - this.stream = stream; |
15 | | - |
16 | | - this._list = null; |
17 | | - |
18 | | - const self = this; |
19 | | - handle.close = function(cb) { |
20 | | - debug('close'); |
21 | | - self.doClose(cb); |
22 | | - }; |
23 | | - handle.isAlive = function() { |
24 | | - return self.isAlive(); |
25 | | - }; |
26 | | - handle.isClosing = function() { |
27 | | - return self.isClosing(); |
28 | | - }; |
29 | | - handle.onreadstart = function() { |
30 | | - return self.readStart(); |
31 | | - }; |
32 | | - handle.onreadstop = function() { |
33 | | - return self.readStop(); |
34 | | - }; |
35 | | - handle.onshutdown = function(req) { |
36 | | - return self.doShutdown(req); |
37 | | - }; |
38 | | - handle.onwrite = function(req, bufs) { |
39 | | - return self.doWrite(req, bufs); |
40 | | - }; |
41 | | - |
42 | | - this.stream.pause(); |
43 | | - this.stream.on('error', function onerror(err) { |
44 | | - self.emit('error', err); |
45 | | - }); |
46 | | - this.stream.on('data', function ondata(chunk) { |
47 | | - if (typeof chunk === 'string' || this._readableState.objectMode === true) { |
48 | | - // Make sure that no further `data` events will happen |
49 | | - this.pause(); |
50 | | - this.removeListener('data', ondata); |
51 | | - |
52 | | - self.emit('error', new errors.Error('ERR_STREAM_WRAP')); |
53 | | - return; |
54 | | - } |
55 | | - |
56 | | - debug('data', chunk.length); |
57 | | - if (self._handle) |
58 | | - self._handle.readBuffer(chunk); |
59 | | - }); |
60 | | - this.stream.once('end', function onend() { |
61 | | - debug('end'); |
62 | | - if (self._handle) |
63 | | - self._handle.emitEOF(); |
64 | | - }); |
65 | | - |
66 | | - Socket.call(this, { |
67 | | - handle: handle |
68 | | - }); |
69 | | -} |
70 | | -util.inherits(StreamWrap, Socket); |
71 | | -module.exports = StreamWrap; |
72 | | - |
73 | | -// require('_stream_wrap').StreamWrap |
74 | | -StreamWrap.StreamWrap = StreamWrap; |
75 | | - |
76 | | -StreamWrap.prototype.isAlive = function isAlive() { |
77 | | - return true; |
78 | | -}; |
79 | | - |
80 | | -StreamWrap.prototype.isClosing = function isClosing() { |
81 | | - return !this.readable || !this.writable; |
82 | | -}; |
83 | | - |
84 | | -StreamWrap.prototype.readStart = function readStart() { |
85 | | - this.stream.resume(); |
86 | | - return 0; |
87 | | -}; |
88 | | - |
89 | | -StreamWrap.prototype.readStop = function readStop() { |
90 | | - this.stream.pause(); |
91 | | - return 0; |
92 | | -}; |
93 | | - |
94 | | -StreamWrap.prototype.doShutdown = function doShutdown(req) { |
95 | | - const self = this; |
96 | | - const handle = this._handle; |
97 | | - const item = this._enqueue('shutdown', req); |
98 | | - |
99 | | - this.stream.end(function() { |
100 | | - // Ensure that write was dispatched |
101 | | - setImmediate(function() { |
102 | | - if (!self._dequeue(item)) |
103 | | - return; |
104 | | - |
105 | | - handle.finishShutdown(req, 0); |
106 | | - }); |
107 | | - }); |
108 | | - return 0; |
109 | | -}; |
110 | | - |
111 | | -StreamWrap.prototype.doWrite = function doWrite(req, bufs) { |
112 | | - const self = this; |
113 | | - const handle = self._handle; |
114 | | - |
115 | | - var pending = bufs.length; |
116 | | - |
117 | | - // Queue the request to be able to cancel it |
118 | | - const item = self._enqueue('write', req); |
119 | | - |
120 | | - self.stream.cork(); |
121 | | - for (var n = 0; n < bufs.length; n++) |
122 | | - self.stream.write(bufs[n], done); |
123 | | - self.stream.uncork(); |
124 | | - |
125 | | - function done(err) { |
126 | | - if (!err && --pending !== 0) |
127 | | - return; |
128 | | - |
129 | | - // Ensure that this is called once in case of error |
130 | | - pending = 0; |
131 | | - |
132 | | - let errCode = 0; |
133 | | - if (err) { |
134 | | - const code = uv[`UV_${err.code}`]; |
135 | | - errCode = (err.code && code) ? code : uv.UV_EPIPE; |
136 | | - } |
137 | | - |
138 | | - // Ensure that write was dispatched |
139 | | - setImmediate(function() { |
140 | | - // Do not invoke callback twice |
141 | | - if (!self._dequeue(item)) |
142 | | - return; |
143 | | - |
144 | | - handle.doAfterWrite(req); |
145 | | - handle.finishWrite(req, errCode); |
146 | | - }); |
147 | | - } |
148 | | - |
149 | | - return 0; |
150 | | -}; |
151 | | - |
152 | | -function QueueItem(type, req) { |
153 | | - this.type = type; |
154 | | - this.req = req; |
155 | | - this.prev = this; |
156 | | - this.next = this; |
157 | | -} |
158 | | - |
159 | | -StreamWrap.prototype._enqueue = function _enqueue(type, req) { |
160 | | - const item = new QueueItem(type, req); |
161 | | - if (this._list === null) { |
162 | | - this._list = item; |
163 | | - return item; |
164 | | - } |
165 | | - |
166 | | - item.next = this._list.next; |
167 | | - item.prev = this._list; |
168 | | - item.next.prev = item; |
169 | | - item.prev.next = item; |
170 | | - |
171 | | - return item; |
172 | | -}; |
173 | | - |
174 | | -StreamWrap.prototype._dequeue = function _dequeue(item) { |
175 | | - assert(item instanceof QueueItem); |
176 | | - |
177 | | - var next = item.next; |
178 | | - var prev = item.prev; |
179 | | - |
180 | | - if (next === null && prev === null) |
181 | | - return false; |
182 | | - |
183 | | - item.next = null; |
184 | | - item.prev = null; |
185 | | - |
186 | | - if (next === item) { |
187 | | - prev = null; |
188 | | - next = null; |
189 | | - } else { |
190 | | - prev.next = next; |
191 | | - next.prev = prev; |
192 | | - } |
193 | | - |
194 | | - if (this._list === item) |
195 | | - this._list = next; |
196 | | - |
197 | | - return true; |
198 | | -}; |
199 | | - |
200 | | -StreamWrap.prototype.doClose = function doClose(cb) { |
201 | | - const self = this; |
202 | | - const handle = self._handle; |
203 | | - |
204 | | - setImmediate(function() { |
205 | | - while (self._list !== null) { |
206 | | - const item = self._list; |
207 | | - const req = item.req; |
208 | | - self._dequeue(item); |
209 | | - |
210 | | - const errCode = uv.UV_ECANCELED; |
211 | | - if (item.type === 'write') { |
212 | | - handle.doAfterWrite(req); |
213 | | - handle.finishWrite(req, errCode); |
214 | | - } else if (item.type === 'shutdown') { |
215 | | - handle.finishShutdown(req, errCode); |
216 | | - } |
217 | | - } |
218 | | - |
219 | | - // Should be already set by net.js |
220 | | - assert(self._handle === null); |
221 | | - cb(); |
222 | | - }); |
223 | | -}; |
| 3 | +module.exports = require('internal/wrap_js_stream'); |
0 commit comments