Skip to content

Commit

Permalink
Bring in code from the latest SDK (flutter#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
nex3 authored Feb 10, 2018
1 parent 910bd47 commit 0f37053
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 68 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 1.0.7

* Support the latest dev SDK.

## 1.0.6

* Declare support for `async` 2.0.0.
Expand Down
79 changes: 51 additions & 28 deletions lib/src/copy/bytes_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
// Because it's copied directly, there are no modifications from the original.
//
// This is up-to-date as of sdk revision
// e41fb4cafd6052157dbc1490d437045240f4773f.
// 365f7b5a8b6ef900a5ee23913b7203569b81b175.

import 'dart:math';
import 'dart:typed_data';

/// Builds a list of bytes, allowing bytes and lists of bytes to be added at the
Expand Down Expand Up @@ -46,7 +45,7 @@ abstract class BytesBuilder {

/// Returns the contents of `this` and clears `this`.
///
/// The list returned is a view of the the internal buffer, limited to the
/// The list returned is a view of the internal buffer, limited to the
/// [length].
List<int> takeBytes();

Expand All @@ -72,24 +71,22 @@ class _CopyingBytesBuilder implements BytesBuilder {
// Start with 1024 bytes.
static const int _INIT_SIZE = 1024;

static final _emptyList = new Uint8List(0);

int _length = 0;
Uint8List _buffer;

_CopyingBytesBuilder([int initialCapacity = 0])
: _buffer = (initialCapacity <= 0)
? _emptyList
: new Uint8List(_pow2roundup(initialCapacity));

void add(List<int> bytes) {
int bytesLength = bytes.length;
if (bytesLength == 0) return;
int required = _length + bytesLength;
if (_buffer == null) {
int size = _pow2roundup(required);
size = max(size, _INIT_SIZE);
_buffer = new Uint8List(size);
} else if (_buffer.length < required) {
// We will create a list in the range of 2-4 times larger than
// required.
int size = _pow2roundup(required) * 2;
var newBuffer = new Uint8List(size);
newBuffer.setRange(0, _buffer.length, _buffer);
_buffer = newBuffer;
if (_buffer.length < required) {
_grow(required);
}
assert(_buffer.length >= required);
if (bytes is Uint8List) {
Expand All @@ -103,18 +100,39 @@ class _CopyingBytesBuilder implements BytesBuilder {
}

void addByte(int byte) {
add([byte]);
if (_buffer.length == _length) {
// The grow algorithm always at least doubles.
// If we added one to _length it would quadruple unnecessarily.
_grow(_length);
}
assert(_buffer.length > _length);
_buffer[_length] = byte;
_length++;
}

void _grow(int required) {
// We will create a list in the range of 2-4 times larger than
// required.
int newSize = required * 2;
if (newSize < _INIT_SIZE) {
newSize = _INIT_SIZE;
} else {
newSize = _pow2roundup(newSize);
}
var newBuffer = new Uint8List(newSize);
newBuffer.setRange(0, _buffer.length, _buffer);
_buffer = newBuffer;
}

List<int> takeBytes() {
if (_buffer == null) return new Uint8List(0);
if (_length == 0) return _emptyList;
var buffer = new Uint8List.view(_buffer.buffer, 0, _length);
clear();
return buffer;
}

List<int> toBytes() {
if (_buffer == null) return new Uint8List(0);
if (_length == 0) return _emptyList;
return new Uint8List.fromList(
new Uint8List.view(_buffer.buffer, 0, _length));
}
Expand All @@ -127,10 +145,11 @@ class _CopyingBytesBuilder implements BytesBuilder {

void clear() {
_length = 0;
_buffer = null;
_buffer = _emptyList;
}

int _pow2roundup(int x) {
static int _pow2roundup(int x) {
assert(x > 0);
--x;
x |= x >> 1;
x |= x >> 2;
Expand All @@ -143,24 +162,28 @@ class _CopyingBytesBuilder implements BytesBuilder {

class _BytesBuilder implements BytesBuilder {
int _length = 0;
final _chunks = <List<int>>[];
final List<Uint8List> _chunks = [];

void add(List<int> bytes) {
if (bytes is! Uint8List) {
bytes = new Uint8List.fromList(bytes);
Uint8List typedBytes;
if (bytes is Uint8List) {
typedBytes = bytes;
} else {
typedBytes = new Uint8List.fromList(bytes);
}
_chunks.add(bytes);
_length += bytes.length;
_chunks.add(typedBytes);
_length += typedBytes.length;
}

void addByte(int byte) {
add([byte]);
_chunks.add(new Uint8List(1)..[0] = byte);
_length++;
}

List<int> takeBytes() {
if (_chunks.length == 0) return new Uint8List(0);
if (_length == 0) return _CopyingBytesBuilder._emptyList;
if (_chunks.length == 1) {
var buffer = _chunks.single;
var buffer = _chunks[0];
clear();
return buffer;
}
Expand All @@ -175,7 +198,7 @@ class _BytesBuilder implements BytesBuilder {
}

List<int> toBytes() {
if (_chunks.length == 0) return new Uint8List(0);
if (_length == 0) return _CopyingBytesBuilder._emptyList;
var buffer = new Uint8List(_length);
int offset = 0;
for (var chunk in _chunks) {
Expand Down
34 changes: 21 additions & 13 deletions lib/src/copy/io_sink.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// desired public API and to remove "dart:io" dependencies have been made.
//
// This is up-to-date as of sdk revision
// e41fb4cafd6052157dbc1490d437045240f4773f.
// 365f7b5a8b6ef900a5ee23913b7203569b81b175.

import 'dart:async';

Expand All @@ -24,32 +24,40 @@ class StreamSinkImpl<T> implements StreamSink<T> {

StreamSinkImpl(this._target);

// The _reportClosedSink method has been deleted for web_socket_channel. This
// method did nothing but print to stderr, which is unavailable here.

void add(T data) {
if (_isClosed) return;
if (_isClosed) {
return;
}
_controller.add(data);
}

void addError(error, [StackTrace stackTrace]) {
if (_isClosed) {
return;
}
_controller.addError(error, stackTrace);
}

Future addStream(Stream<T> stream) {
if (_isBound) {
throw new StateError("StreamSink is already bound to a stream");
}
_isBound = true;
if (_hasError) return done;
// Wait for any sync operations to complete.
Future targetAddStream() {
return _target.addStream(stream).whenComplete(() {
_isBound = false;
});
}

if (_controllerInstance == null) return targetAddStream();
var future = _controllerCompleter.future;
_controllerInstance.close();
return future.then((_) => targetAddStream());
_isBound = true;
var future = _controllerCompleter == null
? _target.addStream(stream)
: _controllerCompleter.future.then((_) => _target.addStream(stream));
_controllerInstance?.close();

// Wait for any pending events in [_controller] to be dispatched before
// adding [stream].
return future.whenComplete(() {
_isBound = false;
});
}

Future flush() {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/copy/web_socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// desired public API and to remove "dart:io" dependencies have been made.
//
// This is up-to-date as of sdk revision
// e41fb4cafd6052157dbc1490d437045240f4773f.
// 365f7b5a8b6ef900a5ee23913b7203569b81b175.

/// Web socket status codes used when closing a web socket connection.
abstract class WebSocketStatus {
Expand Down
63 changes: 39 additions & 24 deletions lib/src/copy/web_socket_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
// desired public API and to remove "dart:io" dependencies have been made.
//
// This is up-to-date as of sdk revision
// e41fb4cafd6052157dbc1490d437045240f4773f.
// 365f7b5a8b6ef900a5ee23913b7203569b81b175.

import 'dart:async';
import 'dart:convert';
Expand Down Expand Up @@ -57,16 +57,21 @@ class _WebSocketOpcode {
static const int RESERVED_F = 15;
}

class _EncodedString {
final List<int> bytes;
_EncodedString(this.bytes);
}

/// The web socket protocol transformer handles the protocol byte stream
/// which is supplied through the [:handleData:]. As the protocol is processed,
/// which is supplied through the `handleData`. As the protocol is processed,
/// it'll output frame data as either a List<int> or String.
///
/// Important information about usage: Be sure you use cancelOnError, so the
/// socket will be closed when the processor encounter an error. Not using it
/// will lead to undefined behaviour.
// TODO(ajohnsen): make this transformer reusable?
class _WebSocketProtocolTransformer
implements StreamTransformer<List<int>, dynamic>, EventSink<List<int>> {
class _WebSocketProtocolTransformer extends StreamTransformerBase<List<int>,
dynamic /*List<int>|_WebSocketPing|_WebSocketPong*/ >
implements EventSink<List<int>> {
static const int START = 0;
static const int LEN_FIRST = 1;
static const int LEN_REST = 2;
Expand All @@ -93,15 +98,16 @@ class _WebSocketProtocolTransformer
int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
String closeReason = "";

EventSink _eventSink;
EventSink<dynamic /*List<int>|_WebSocketPing|_WebSocketPong*/ > _eventSink;

final bool _serverSide;
final List _maskingBytes = new List(4);
final BytesBuilder _payload = new BytesBuilder(copy: false);

_WebSocketProtocolTransformer([this._serverSide = false]);

Stream bind(Stream stream) {
Stream<dynamic /*List<int>|_WebSocketPing|_WebSocketPong*/ > bind(
Stream<List<int>> stream) {
return new Stream.eventTransformed(stream, (EventSink eventSink) {
if (_eventSink != null) {
throw new StateError("WebSocket transformer already used.");
Expand Down Expand Up @@ -319,7 +325,7 @@ class _WebSocketProtocolTransformer

switch (_currentMessageType) {
case _WebSocketMessageType.TEXT:
_eventSink.add(UTF8.decode(bytes));
_eventSink.add(utf8.decode(bytes));
break;
case _WebSocketMessageType.BINARY:
_eventSink.add(bytes);
Expand All @@ -344,7 +350,7 @@ class _WebSocketProtocolTransformer
throw new WebSocketChannelException("Protocol error");
}
if (payload.length > 2) {
closeReason = UTF8.decode(payload.sublist(2));
closeReason = utf8.decode(payload.sublist(2));
}
}
_state = CLOSED;
Expand Down Expand Up @@ -392,14 +398,15 @@ class _WebSocketPong {

// TODO(ajohnsen): Make this transformer reusable.
class _WebSocketOutgoingTransformer
implements StreamTransformer<dynamic, List<int>>, EventSink {
extends StreamTransformerBase<dynamic, List<int>> implements EventSink {
final WebSocketImpl webSocket;
EventSink<List<int>> _eventSink;

_WebSocketOutgoingTransformer(this.webSocket);

Stream<List<int>> bind(Stream stream) {
return new Stream.eventTransformed(stream, (eventSink) {
return new Stream<List<int>>.eventTransformed(stream,
(EventSink<List<int>> eventSink) {
if (_eventSink != null) {
throw new StateError("WebSocket transformer already used");
}
Expand All @@ -422,14 +429,15 @@ class _WebSocketOutgoingTransformer
if (message != null) {
if (message is String) {
opcode = _WebSocketOpcode.TEXT;
data = UTF8.encode(message);
data = utf8.encode(message);
} else if (message is List<int>) {
opcode = _WebSocketOpcode.BINARY;
data = message;
} else if (message is _EncodedString) {
opcode = _WebSocketOpcode.TEXT;
data = message.bytes;
} else {
if (message is List<int>) {
data = message;
opcode = _WebSocketOpcode.BINARY;
} else {
throw new ArgumentError(message);
}
throw new ArgumentError(message);
}
} else {
opcode = _WebSocketOpcode.TEXT;
Expand All @@ -450,17 +458,24 @@ class _WebSocketOutgoingTransformer
data.add((code >> 8) & 0xFF);
data.add(code & 0xFF);
if (reason != null) {
data.addAll(UTF8.encode(reason));
data.addAll(utf8.encode(reason));
}
}
addFrame(_WebSocketOpcode.CLOSE, data);
_eventSink.close();
}

void addFrame(int opcode, List<int> data) =>
createFrame(opcode, data, webSocket._serverSide, false).forEach((e) {
_eventSink.add(e);
});
void addFrame(int opcode, List<int> data) {
createFrame(
opcode,
data,
webSocket._serverSide,
// Logic around _deflateHelper was removed here, since ther ewill never
// be a deflate helper for a cross-platform WebSocket client.
false).forEach((e) {
_eventSink.add(e);
});
}

static Iterable<List<int>> createFrame(
int opcode, List<int> data, bool serverSide, bool compressed) {
Expand Down Expand Up @@ -563,7 +578,7 @@ class _WebSocketConsumer implements StreamConsumer {
StreamSubscription _subscription;
bool _issuedPause = false;
bool _closed = false;
final Completer _closeCompleter = new Completer();
final Completer _closeCompleter = new Completer<WebSocketImpl>();
Completer _completer;

_WebSocketConsumer(this.webSocket, this.sink);
Expand Down
Loading

0 comments on commit 0f37053

Please sign in to comment.