Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataChannel termination via the new DirectTransport #409

Merged
merged 24 commits into from
May 29, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions TODO_DATACHANNEL_TERMINATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# TODO DataChannel Termination


## Implementation

- Should we store somewhere a message received via `dataProducer.send()` when we deliver it to a `dataConsumer` (of type 'sctp' or 'direct')?


## Documentation

- Set proper max size for `netstring` messages in `PayloadChannel` (in JS and C++). It must match `maxSctpMessageSize` in `DataTransport.ts`.

- `sctpStreamParameters` are now optional in `DataProducerOptions`.

- `sctpStreamParameters` getter can now return `undefined` in `DataProducer/Consumer`.

- Added `dataProducer/Consumer.type`: 'sctp' | 'direct'.

- `transport.consumeData()` (when in SCTP) now accepts optional arguments:

```ts
ordered?: boolean;
maxPacketLifeTime?: number;
maxRetransmits?: number;
```

- Added a new `direct` log tag.
2 changes: 1 addition & 1 deletion lib/Channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class Channel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
switch (nsPayload[0]) {
// 123 = '{' (a Channel JSON messsage).
case 123:
this._processMessage(JSON.parse(nsPayload));
this._processMessage(JSON.parse(nsPayload.toString('utf8')));
break;
// 68 = 'D' (a debug log).
case 68:
Expand Down
37 changes: 35 additions & 2 deletions lib/DataConsumer.d.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,33 @@
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { Channel } from './Channel';
import { PayloadChannel } from './PayloadChannel';
import { SctpStreamParameters } from './SctpParameters';
export declare type DataConsumerOptions = {
/**
* The id of the DataProducer to consume.
*/
dataProducerId: string;
/**
* Just if consuming over SCTP.
* Whether data messages must be received in order. If true the messages will
* be sent reliably. Defaults to the value in the DataProducer if it has type
* 'sctp' or to true if it has type 'direct'.
*/
ordered?: boolean;
/**
* Just if consuming over SCTP.
* When ordered is false indicates the time (in milliseconds) after which a
* SCTP packet will stop being retransmitted. Defaults to the value in the
* DataProducer if it has type 'sctp' or unset if it has type 'direct'.
*/
maxPacketLifeTime?: number;
/**
* Just if consuming over SCTP.
* When ordered is false indicates the maximum number of times a packet will
* be retransmitted. Defaults to the value in the DataProducer if it has type
* 'sctp' or unset if it has type 'direct'.
*/
maxRetransmits?: number;
/**
* Custom application data.
*/
Expand All @@ -19,24 +41,31 @@ export declare type DataConsumerStat = {
messagesSent: number;
bytesSent: number;
};
/**
* DataConsumer type.
*/
export declare type DataConsumerType = 'sctp' | 'direct';
export declare class DataConsumer extends EnhancedEventEmitter {
private readonly _internal;
private readonly _data;
private readonly _channel;
private readonly _payloadChannel;
private _closed;
private readonly _appData?;
private readonly _observer;
/**
* @private
* @emits transportclose
* @emits dataproducerclose
* @emits message - (message: Buffer, ppid: number)
* @emits @close
* @emits @dataproducerclose
*/
constructor({ internal, data, channel, appData }: {
constructor({ internal, data, channel, payloadChannel, appData }: {
internal: any;
data: any;
channel: Channel;
payloadChannel: PayloadChannel;
appData: any;
});
/**
Expand All @@ -51,10 +80,14 @@ export declare class DataConsumer extends EnhancedEventEmitter {
* Whether the DataConsumer is closed.
*/
get closed(): boolean;
/**
* DataConsumer type.
*/
get type(): DataConsumerType;
/**
* SCTP stream parameters.
*/
get sctpStreamParameters(): SctpStreamParameters;
get sctpStreamParameters(): SctpStreamParameters | undefined;
/**
* DataChannel label.
*/
Expand Down
2 changes: 1 addition & 1 deletion lib/DataConsumer.d.ts.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 26 additions & 1 deletion lib/DataConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ class DataConsumer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
* @private
* @emits transportclose
* @emits dataproducerclose
* @emits message - (message: Buffer, ppid: number)
* @emits @close
* @emits @dataproducerclose
*/
constructor({ internal, data, channel, appData }) {
constructor({ internal, data, channel, payloadChannel, appData }) {
super();
// Closed flag.
this._closed = false;
Expand All @@ -21,6 +22,7 @@ class DataConsumer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
this._internal = internal;
this._data = data;
this._channel = channel;
this._payloadChannel = payloadChannel;
this._appData = appData;
this._handleWorkerNotifications();
}
Expand All @@ -42,6 +44,12 @@ class DataConsumer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
get closed() {
return this._closed;
}
/**
* DataConsumer type.
*/
get type() {
return this._data.type;
}
/**
* SCTP stream parameters.
*/
Expand Down Expand Up @@ -148,6 +156,23 @@ class DataConsumer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
}
}
});
this._payloadChannel.on(this._internal.dataConsumerId, (event, data, payload) => {
switch (event) {
case 'message':
{
if (this._closed)
break;
const ppid = data.ppid;
const message = payload;
this.safeEmit('message', message, ppid);
break;
}
default:
{
logger.error('ignoring unknown event "%s"', event);
}
}
});
}
}
exports.DataConsumer = DataConsumer;
23 changes: 20 additions & 3 deletions lib/DataProducer.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/// <reference types="node" />
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import { Channel } from './Channel';
import { PayloadChannel } from './PayloadChannel';
import { SctpStreamParameters } from './SctpParameters';
export declare type DataProducerOptions = {
/**
Expand All @@ -8,8 +10,9 @@ export declare type DataProducerOptions = {
id?: string;
/**
* SCTP parameters defining how the endpoint is sending the data.
* Just if messages are sent over SCTP.
*/
sctpStreamParameters: SctpStreamParameters;
sctpStreamParameters?: SctpStreamParameters;
/**
* A label which can be used to distinguish this DataChannel from others.
*/
Expand All @@ -31,10 +34,15 @@ export declare type DataProducerStat = {
messagesReceived: number;
bytesReceived: number;
};
/**
* DataProducer type.
*/
export declare type DataProducerType = 'sctp' | 'direct';
export declare class DataProducer extends EnhancedEventEmitter {
private readonly _internal;
private readonly _data;
private readonly _channel;
private readonly _payloadChannel;
private _closed;
private readonly _appData?;
private readonly _observer;
Expand All @@ -43,10 +51,11 @@ export declare class DataProducer extends EnhancedEventEmitter {
* @emits transportclose
* @emits @close
*/
constructor({ internal, data, channel, appData }: {
constructor({ internal, data, channel, payloadChannel, appData }: {
internal: any;
data: any;
channel: Channel;
payloadChannel: PayloadChannel;
appData: any;
});
/**
Expand All @@ -57,10 +66,14 @@ export declare class DataProducer extends EnhancedEventEmitter {
* Whether the DataProducer is closed.
*/
get closed(): boolean;
/**
* DataProducer type.
*/
get type(): DataProducerType;
/**
* SCTP stream parameters.
*/
get sctpStreamParameters(): SctpStreamParameters;
get sctpStreamParameters(): SctpStreamParameters | undefined;
/**
* DataChannel label.
*/
Expand Down Expand Up @@ -101,6 +114,10 @@ export declare class DataProducer extends EnhancedEventEmitter {
* Get DataProducer stats.
*/
getStats(): Promise<DataProducerStat[]>;
/**
* Send data (just valid for DataProducers created on a DirectTransport).
*/
send(message: string | Buffer, ppid?: number): void;
private _handleWorkerNotifications;
}
//# sourceMappingURL=DataProducer.d.ts.map
2 changes: 1 addition & 1 deletion lib/DataProducer.d.ts.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 44 additions & 1 deletion lib/DataProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class DataProducer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
* @emits transportclose
* @emits @close
*/
constructor({ internal, data, channel, appData }) {
constructor({ internal, data, channel, payloadChannel, appData }) {
super();
// Closed flag.
this._closed = false;
Expand All @@ -19,6 +19,7 @@ class DataProducer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
this._internal = internal;
this._data = data;
this._channel = channel;
this._payloadChannel = payloadChannel;
this._appData = appData;
this._handleWorkerNotifications();
}
Expand All @@ -34,6 +35,12 @@ class DataProducer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
get closed() {
return this._closed;
}
/**
* DataProducer type.
*/
get type() {
return this._data.type;
}
/**
* SCTP stream parameters.
*/
Expand Down Expand Up @@ -116,6 +123,42 @@ class DataProducer extends EnhancedEventEmitter_1.EnhancedEventEmitter {
logger.debug('getStats()');
return this._channel.request('dataProducer.getStats', this._internal);
}
/**
* Send data (just valid for DataProducers created on a DirectTransport).
*/
send(message, ppid) {
logger.debug('send()');
if (typeof message !== 'string' && !Buffer.isBuffer(message)) {
throw new TypeError('message must be a string or a Buffer');
}
/*
* +-------------------------------+----------+
* | Value | SCTP |
* | | PPID |
* +-------------------------------+----------+
* | WebRTC String | 51 |
* | WebRTC Binary Partial | 52 |
* | (Deprecated) | |
* | WebRTC Binary | 53 |
* | WebRTC String Partial | 54 |
* | (Deprecated) | |
* | WebRTC String Empty | 56 |
* | WebRTC Binary Empty | 57 |
* +-------------------------------+----------+
*/
if (typeof ppid !== 'number') {
ppid = (typeof message === 'string')
? message.length > 0 ? 51 : 56
: message.length > 0 ? 53 : 57;
}
// Ensure we honor PPIDs.
if (ppid === 56)
message = ' ';
else if (ppid === 57)
message = Buffer.alloc(1);
const notifData = { ppid };
this._payloadChannel.notify('dataProducer.send', this._internal, notifData, message);
}
_handleWorkerNotifications() {
// No need to subscribe to any event.
}
Expand Down
Loading