Переглянути джерело

fix(datachannel): sending order is now preserved correctly (#1038)

This upgrades `binarypack` to version 2.

`pack` now returns `ArrayBuffer`s which can be sent over all data
channel types without asynchronous conversion.
This implementation is also much faster than version 1.

Closes #746
Jonas Gloning 2 роки тому
батько
коміт
0fb61792ed
5 змінених файлів з 51 додано та 33 видалено
  1. 5 4
      e2e/datachannel/serialization.spec.ts
  2. 15 18
      lib/dataconnection.ts
  3. 16 3
      lib/util.ts
  4. 14 7
      package-lock.json
  5. 1 1
      package.json

+ 5 - 4
e2e/datachannel/serialization.spec.ts

@@ -25,10 +25,11 @@ describe("DataChannel:Default", () => {
 		await P.init();
 	});
 	it("should transfer numbers", serializationTest("./numbers.js"));
-	/** ordering bug: chunked string not in order */
-	// it('should transfer strings', serializationTest("./strings.js"))
+	it("should transfer strings", serializationTest("./strings.js"));
 	it("should transfer objects", serializationTest("./objects.js"));
 	it("should transfer arrays", serializationTest("./arrays.js"));
-	/** can't send bug */
-	// it('should transfer typed arrays / array buffers', serializationTest("./arraybuffers.js"))
+	it(
+		"should transfer typed arrays / array buffers",
+		serializationTest("./arraybuffers.js"),
+	);
 });

+ 15 - 18
lib/dataconnection.ts

@@ -1,4 +1,4 @@
-import { util } from "./util";
+import { concatArrayBuffers, util } from "./util";
 import logger from "./logger";
 import { Negotiator } from "./negotiator";
 import { ConnectionType, SerializationType, ServerMessageType } from "./enums";
@@ -65,7 +65,7 @@ export class DataConnection
 	private _buffering = false;
 	private _chunkedData: {
 		[id: number]: {
-			data: Blob[];
+			data: Uint8Array[];
 			count: number;
 			total: number;
 		};
@@ -119,9 +119,7 @@ export class DataConnection
 	/** Called by the Negotiator when the DataChannel is ready. */
 	override _initializeDataChannel(dc: RTCDataChannel): void {
 		this._dc = dc;
-		if (!util.supports.binaryBlob || util.supports.reliable) {
-			this.dataChannel.binaryType = "arraybuffer";
-		}
+		this.dataChannel.binaryType = "arraybuffer";
 
 		this.dataChannel.onopen = () => {
 			logger.log(`DC#${this.connectionId} dc connection success`);
@@ -193,7 +191,7 @@ export class DataConnection
 		__peerData: number;
 		n: number;
 		total: number;
-		data: Blob;
+		data: ArrayBuffer;
 	}): void {
 		const id = data.__peerData;
 		const chunkInfo = this._chunkedData[id] || {
@@ -202,7 +200,7 @@ export class DataConnection
 			total: data.total,
 		};
 
-		chunkInfo.data[data.n] = data.data;
+		chunkInfo.data[data.n] = new Uint8Array(data.data);
 		chunkInfo.count++;
 		this._chunkedData[id] = chunkInfo;
 
@@ -211,8 +209,8 @@ export class DataConnection
 			delete this._chunkedData[id];
 
 			// We've received all the chunks--time to construct the complete data.
-			const data = new Blob(chunkInfo.data);
-			this._handleDataMessage({ data });
+			const data = concatArrayBuffers(chunkInfo.data);
+			this.emit("data", util.unpack(data));
 		}
 	}
 
@@ -283,6 +281,11 @@ export class DataConnection
 			return;
 		}
 
+		if (data instanceof Blob) {
+			data.arrayBuffer().then((ab) => this.send(ab));
+			return;
+		}
+
 		if (this.serialization === SerializationType.JSON) {
 			this._bufferedSend(this.stringify(data));
 		} else if (
@@ -291,18 +294,12 @@ export class DataConnection
 		) {
 			const blob = util.pack(data);
 
-			if (!chunked && blob.size > util.chunkedMTU) {
+			if (!chunked && blob.byteLength > util.chunkedMTU) {
 				this._sendChunks(blob);
 				return;
 			}
 
-			if (!util.supports.binaryBlob) {
-				// We only do this if we really need to (e.g. blobs are not supported),
-				// because this conversion is costly.
-				this._encodingQueue.enque(blob);
-			} else {
-				this._bufferedSend(blob);
-			}
+			this._bufferedSend(blob);
 		} else {
 			this._bufferedSend(data);
 		}
@@ -364,7 +361,7 @@ export class DataConnection
 		}
 	}
 
-	private _sendChunks(blob: Blob): void {
+	private _sendChunks(blob: ArrayBuffer): void {
 		const blobs = util.chunk(blob);
 		logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`);
 

+ 16 - 3
lib/util.ts

@@ -131,10 +131,10 @@ export class Util {
 	private _dataCount: number = 1;
 
 	chunk(
-		blob: Blob,
-	): { __peerData: number; n: number; total: number; data: Blob }[] {
+		blob: ArrayBuffer,
+	): { __peerData: number; n: number; total: number; data: ArrayBuffer }[] {
 		const chunks = [];
-		const size = blob.size;
+		const size = blob.byteLength;
 		const total = Math.ceil(size / util.chunkedMTU);
 
 		let index = 0;
@@ -208,3 +208,16 @@ export class Util {
  * :::
  */
 export const util = new Util();
+export function concatArrayBuffers(bufs: Uint8Array[]) {
+	let size = 0;
+	for (const buf of bufs) {
+		size += buf.byteLength;
+	}
+	const result = new Uint8Array(size);
+	let offset = 0;
+	for (const buf of bufs) {
+		result.set(buf, offset);
+		offset += buf.byteLength;
+	}
+	return result;
+}

+ 14 - 7
package-lock.json

@@ -11,7 +11,7 @@
 			"dependencies": {
 				"@swc/helpers": "^0.5.0",
 				"eventemitter3": "^4.0.7",
-				"peerjs-js-binarypack": "1.0.2",
+				"peerjs-js-binarypack": "^2.0.0",
 				"webrtc-adapter": "^8.0.0"
 			},
 			"devDependencies": {
@@ -14550,9 +14550,16 @@
 			}
 		},
 		"node_modules/peerjs-js-binarypack": {
-			"version": "1.0.2",
-			"resolved": "https://registry.npmjs.org/peerjs-js-binarypack/-/peerjs-js-binarypack-1.0.2.tgz",
-			"integrity": "sha512-/5OcAiSN8R/Hd1YCGQ+EcBtq5S10BZekZ85Cmwzf1DYM34y6RYXpRxRTCYL1bQH3GXNBr/tQftMKUw4UNb+ZMg=="
+			"version": "2.0.0",
+			"resolved": "https://registry.npmjs.org/peerjs-js-binarypack/-/peerjs-js-binarypack-2.0.0.tgz",
+			"integrity": "sha512-wu+L0Qeg4IH2DXm3B6xKP5ODeCIovwEEO/Fu3MVqApPQeVLzSdZpFzQzPobh+sdhUWMQGEO7YxHeiwpPngLjqQ==",
+			"engines": {
+				"node": ">= 14.0.0"
+			},
+			"funding": {
+				"type": "opencollective",
+				"url": "https://opencollective.com/peer"
+			}
 		},
 		"node_modules/pend": {
 			"version": "1.2.0",
@@ -28503,9 +28510,9 @@
 			}
 		},
 		"peerjs-js-binarypack": {
-			"version": "1.0.2",
-			"resolved": "https://registry.npmjs.org/peerjs-js-binarypack/-/peerjs-js-binarypack-1.0.2.tgz",
-			"integrity": "sha512-/5OcAiSN8R/Hd1YCGQ+EcBtq5S10BZekZ85Cmwzf1DYM34y6RYXpRxRTCYL1bQH3GXNBr/tQftMKUw4UNb+ZMg=="
+			"version": "2.0.0",
+			"resolved": "https://registry.npmjs.org/peerjs-js-binarypack/-/peerjs-js-binarypack-2.0.0.tgz",
+			"integrity": "sha512-wu+L0Qeg4IH2DXm3B6xKP5ODeCIovwEEO/Fu3MVqApPQeVLzSdZpFzQzPobh+sdhUWMQGEO7YxHeiwpPngLjqQ=="
 		},
 		"pend": {
 			"version": "1.2.0",

+ 1 - 1
package.json

@@ -195,7 +195,7 @@
 	"dependencies": {
 		"@swc/helpers": "^0.5.0",
 		"eventemitter3": "^4.0.7",
-		"peerjs-js-binarypack": "1.0.2",
+		"peerjs-js-binarypack": "^2.0.0",
 		"webrtc-adapter": "^8.0.0"
 	}
 }