Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(datachannel): sending order is now preserved correctly #1038

Merged
merged 1 commit into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions e2e/datachannel/serialization.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ describe("DataChannel:Default", () => {
await P.init();
});
it("should transfer numbers", serializationTest("./numbers.js"));
/** ordering bug: chunked string not in order */
// it('should transfer strings', serializationTest("./strings.js"))
it("should transfer strings", serializationTest("./strings.js"));
it("should transfer objects", serializationTest("./objects.js"));
it("should transfer arrays", serializationTest("./arrays.js"));
/** can't send bug */
// it('should transfer typed arrays / array buffers', serializationTest("./arraybuffers.js"))
it(
"should transfer typed arrays / array buffers",
serializationTest("./arraybuffers.js"),
);
});
33 changes: 15 additions & 18 deletions lib/dataconnection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { util } from "./util";
import { concatArrayBuffers, util } from "./util";
import logger from "./logger";
import { Negotiator } from "./negotiator";
import { ConnectionType, SerializationType, ServerMessageType } from "./enums";
Expand Down Expand Up @@ -65,7 +65,7 @@ export class DataConnection
private _buffering = false;
private _chunkedData: {
[id: number]: {
data: Blob[];
data: Uint8Array[];
count: number;
total: number;
};
Expand Down Expand Up @@ -119,9 +119,7 @@ export class DataConnection
/** Called by the Negotiator when the DataChannel is ready. */
override _initializeDataChannel(dc: RTCDataChannel): void {
this._dc = dc;
if (!util.supports.binaryBlob || util.supports.reliable) {
this.dataChannel.binaryType = "arraybuffer";
}
this.dataChannel.binaryType = "arraybuffer";

this.dataChannel.onopen = () => {
logger.log(`DC#${this.connectionId} dc connection success`);
Expand Down Expand Up @@ -193,7 +191,7 @@ export class DataConnection
__peerData: number;
n: number;
total: number;
data: Blob;
data: ArrayBuffer;
}): void {
const id = data.__peerData;
const chunkInfo = this._chunkedData[id] || {
Expand All @@ -202,7 +200,7 @@ export class DataConnection
total: data.total,
};

chunkInfo.data[data.n] = data.data;
chunkInfo.data[data.n] = new Uint8Array(data.data);
chunkInfo.count++;
this._chunkedData[id] = chunkInfo;

Expand All @@ -211,8 +209,8 @@ export class DataConnection
delete this._chunkedData[id];

// We've received all the chunks--time to construct the complete data.
const data = new Blob(chunkInfo.data);
this._handleDataMessage({ data });
const data = concatArrayBuffers(chunkInfo.data);
this.emit("data", util.unpack(data));
}
}

Expand Down Expand Up @@ -283,6 +281,11 @@ export class DataConnection
return;
}

if (data instanceof Blob) {
data.arrayBuffer().then((ab) => this.send(ab));
return;
}

if (this.serialization === SerializationType.JSON) {
this._bufferedSend(this.stringify(data));
} else if (
Expand All @@ -291,18 +294,12 @@ export class DataConnection
) {
const blob = util.pack(data);

if (!chunked && blob.size > util.chunkedMTU) {
if (!chunked && blob.byteLength > util.chunkedMTU) {
this._sendChunks(blob);
return;
}

if (!util.supports.binaryBlob) {
// We only do this if we really need to (e.g. blobs are not supported),
// because this conversion is costly.
this._encodingQueue.enque(blob);
} else {
this._bufferedSend(blob);
}
this._bufferedSend(blob);
} else {
this._bufferedSend(data);
}
Expand Down Expand Up @@ -364,7 +361,7 @@ export class DataConnection
}
}

private _sendChunks(blob: Blob): void {
private _sendChunks(blob: ArrayBuffer): void {
const blobs = util.chunk(blob);
logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`);

Expand Down
19 changes: 16 additions & 3 deletions lib/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ export class Util {
private _dataCount: number = 1;

chunk(
blob: Blob,
): { __peerData: number; n: number; total: number; data: Blob }[] {
blob: ArrayBuffer,
): { __peerData: number; n: number; total: number; data: ArrayBuffer }[] {
const chunks = [];
const size = blob.size;
const size = blob.byteLength;
const total = Math.ceil(size / util.chunkedMTU);

let index = 0;
Expand Down Expand Up @@ -208,3 +208,16 @@ export class Util {
* :::
*/
export const util = new Util();
export function concatArrayBuffers(bufs: Uint8Array[]) {
let size = 0;
for (const buf of bufs) {
size += buf.byteLength;
}
const result = new Uint8Array(size);
let offset = 0;
for (const buf of bufs) {
result.set(buf, offset);
offset += buf.byteLength;
}
return result;
}
21 changes: 14 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@
"dependencies": {
"@swc/helpers": "^0.5.0",
"eventemitter3": "^4.0.7",
"peerjs-js-binarypack": "1.0.2",
"peerjs-js-binarypack": "^2.0.0",
"webrtc-adapter": "^8.0.0"
}
}