|
@@ -1,6 +1,5 @@
|
|
|
-import { Reliable } from "reliable";
|
|
|
import { util } from "./util";
|
|
|
-import logger, { LogLevel } from "./logger";
|
|
|
+import logger from "./logger";
|
|
|
import { Negotiator } from "./negotiator";
|
|
|
import {
|
|
|
ConnectionType,
|
|
@@ -11,12 +10,14 @@ import {
|
|
|
import { Peer } from "./peer";
|
|
|
import { BaseConnection } from "./baseconnection";
|
|
|
import { ServerMessage } from "./servermessage";
|
|
|
+import { EncodingQueue } from './encodingQueue';
|
|
|
|
|
|
/**
|
|
|
* Wraps a DataChannel between two Peers.
|
|
|
*/
|
|
|
export class DataConnection extends BaseConnection {
|
|
|
private static readonly ID_PREFIX = "dc_";
|
|
|
+ private static readonly MAX_BUFFERED_AMOUNT = 8 * 1024 * 1024;
|
|
|
|
|
|
private _negotiator: Negotiator;
|
|
|
readonly label: string;
|
|
@@ -30,11 +31,17 @@ export class DataConnection extends BaseConnection {
|
|
|
private _buffer: any[] = [];
|
|
|
private _bufferSize = 0;
|
|
|
private _buffering = false;
|
|
|
- private _chunkedData = {};
|
|
|
+ private _chunkedData: {
|
|
|
+ [id: number]: {
|
|
|
+ data: Blob[],
|
|
|
+ count: number,
|
|
|
+ total: number
|
|
|
+ }
|
|
|
+ } = {};
|
|
|
|
|
|
private _peerBrowser: any;
|
|
|
private _dc: RTCDataChannel;
|
|
|
- private _reliable: Reliable;
|
|
|
+ private _encodingQueue = new EncodingQueue();
|
|
|
|
|
|
get dataChannel(): RTCDataChannel {
|
|
|
return this._dc;
|
|
@@ -50,12 +57,21 @@ export class DataConnection extends BaseConnection {
|
|
|
|
|
|
this.label = this.options.label || this.connectionId;
|
|
|
this.serialization = this.options.serialization || SerializationType.Binary;
|
|
|
- this.reliable = this.options.reliable;
|
|
|
+ this.reliable = !!this.options.reliable;
|
|
|
|
|
|
if (this.options._payload) {
|
|
|
this._peerBrowser = this.options._payload.browser;
|
|
|
}
|
|
|
|
|
|
+ this._encodingQueue.on('done', (ab: ArrayBuffer) => {
|
|
|
+ this._bufferedSend(ab);
|
|
|
+ });
|
|
|
+
|
|
|
+ this._encodingQueue.on('error', () => {
|
|
|
+ logger.error(`DC#${this.connectionId}: Error occured in encoding from blob to arraybuffer, close DC`);
|
|
|
+ this.close();
|
|
|
+ });
|
|
|
+
|
|
|
this._negotiator = new Negotiator(this);
|
|
|
|
|
|
this._negotiator.startConnection(
|
|
@@ -77,58 +93,48 @@ export class DataConnection extends BaseConnection {
|
|
|
}
|
|
|
|
|
|
this.dataChannel.onopen = () => {
|
|
|
- logger.log("Data channel connection success");
|
|
|
+ logger.log(`DC#${this.connectionId} dc connection success`);
|
|
|
this._open = true;
|
|
|
this.emit(ConnectionEventType.Open);
|
|
|
};
|
|
|
|
|
|
- // Use the Reliable shim for non Firefox browsers
|
|
|
- if (!util.supports.reliable && this.reliable) {
|
|
|
- const isLoggingEnable = logger.logLevel > LogLevel.Disabled;
|
|
|
- this._reliable = new Reliable(this.dataChannel, isLoggingEnable);
|
|
|
- }
|
|
|
+ this.dataChannel.onmessage = (e) => {
|
|
|
+ logger.log(`DC#${this.connectionId} dc onmessage:`, e.data);
|
|
|
+ this._handleDataMessage(e);
|
|
|
+ };
|
|
|
|
|
|
- if (this._reliable) {
|
|
|
- this._reliable.onmessage = (msg) => {
|
|
|
- this.emit(ConnectionEventType.Data, msg);
|
|
|
- };
|
|
|
- } else {
|
|
|
- this.dataChannel.onmessage = (e) => {
|
|
|
- this._handleDataMessage(e);
|
|
|
- };
|
|
|
- }
|
|
|
this.dataChannel.onclose = () => {
|
|
|
- logger.log("DataChannel closed for:", this.peer);
|
|
|
+ logger.log(`DC#${this.connectionId} dc closed for:`, this.peer);
|
|
|
this.close();
|
|
|
};
|
|
|
}
|
|
|
|
|
|
// Handles a DataChannel message.
|
|
|
- private _handleDataMessage({ data }): void {
|
|
|
+ private _handleDataMessage({ data }: { data: Blob | ArrayBuffer | string }): void {
|
|
|
const datatype = data.constructor;
|
|
|
|
|
|
const isBinarySerialization = this.serialization === SerializationType.Binary ||
|
|
|
this.serialization === SerializationType.BinaryUTF8;
|
|
|
|
|
|
- let deserializedData = data;
|
|
|
+ let deserializedData: any = data;
|
|
|
|
|
|
if (isBinarySerialization) {
|
|
|
if (datatype === Blob) {
|
|
|
// Datatype should never be blob
|
|
|
- util.blobToArrayBuffer(data, (ab) => {
|
|
|
+ util.blobToArrayBuffer(data as Blob, (ab) => {
|
|
|
const unpackedData = util.unpack(ab);
|
|
|
this.emit(ConnectionEventType.Data, unpackedData);
|
|
|
});
|
|
|
return;
|
|
|
} else if (datatype === ArrayBuffer) {
|
|
|
- deserializedData = util.unpack(data);
|
|
|
+ deserializedData = util.unpack(data as ArrayBuffer);
|
|
|
} else if (datatype === String) {
|
|
|
// String fallback for binary data for browsers that don't support binary yet
|
|
|
- const ab = util.binaryStringToArrayBuffer(data);
|
|
|
+ const ab = util.binaryStringToArrayBuffer(data as string);
|
|
|
deserializedData = util.unpack(ab);
|
|
|
}
|
|
|
} else if (this.serialization === SerializationType.JSON) {
|
|
|
- deserializedData = JSON.parse(data);
|
|
|
+ deserializedData = JSON.parse(data as string);
|
|
|
}
|
|
|
|
|
|
// Check if we've chunked--if so, piece things back together.
|
|
@@ -141,7 +147,7 @@ export class DataConnection extends BaseConnection {
|
|
|
super.emit(ConnectionEventType.Data, deserializedData);
|
|
|
}
|
|
|
|
|
|
- private _handleChunk(data: any): void {
|
|
|
+ private _handleChunk(data: { __peerData: number, n: number, total: number, data: Blob }): void {
|
|
|
const id = data.__peerData;
|
|
|
const chunkInfo = this._chunkedData[id] || {
|
|
|
data: [],
|
|
@@ -184,6 +190,19 @@ export class DataConnection extends BaseConnection {
|
|
|
this.provider = null;
|
|
|
}
|
|
|
|
|
|
+ if (this.dataChannel) {
|
|
|
+ this.dataChannel.onopen = null;
|
|
|
+ this.dataChannel.onmessage = null;
|
|
|
+ this.dataChannel.onclose = null;
|
|
|
+ this._dc = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (this._encodingQueue) {
|
|
|
+ this._encodingQueue.destroy();
|
|
|
+ this._encodingQueue.removeAllListeners();
|
|
|
+ this._encodingQueue = null;
|
|
|
+ }
|
|
|
+
|
|
|
if (!this.open) {
|
|
|
return;
|
|
|
}
|
|
@@ -205,13 +224,6 @@ export class DataConnection extends BaseConnection {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if (this._reliable) {
|
|
|
- // Note: reliable shim sending will make it so that you cannot customize
|
|
|
- // serialization.
|
|
|
- this._reliable.send(data);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
if (this.serialization === SerializationType.JSON) {
|
|
|
this._bufferedSend(JSON.stringify(data));
|
|
|
} else if (
|
|
@@ -220,28 +232,15 @@ export class DataConnection extends BaseConnection {
|
|
|
) {
|
|
|
const blob = util.pack(data);
|
|
|
|
|
|
- // For Chrome-Firefox interoperability, we need to make Firefox "chunk"
|
|
|
- // the data it sends out.
|
|
|
- const needsChunking =
|
|
|
- util.chunkedBrowsers[this._peerBrowser] ||
|
|
|
- util.chunkedBrowsers[util.browser];
|
|
|
-
|
|
|
- if (needsChunking && !chunked && blob.size > util.chunkedMTU) {
|
|
|
+ if (!chunked && blob.size > util.chunkedMTU) {
|
|
|
this._sendChunks(blob);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // DataChannel currently only supports strings.
|
|
|
- if (!util.supports.sctp) {
|
|
|
- util.blobToBinaryString(blob, (str) => {
|
|
|
- this._bufferedSend(str);
|
|
|
- });
|
|
|
- } else if (!util.supports.binaryBlob) {
|
|
|
+ if (!util.supports.binaryBlob) {
|
|
|
// We only do this if we really need to (e.g. blobs are not supported),
|
|
|
// because this conversion is costly.
|
|
|
- util.blobToArrayBuffer(blob, (ab) => {
|
|
|
- this._bufferedSend(ab);
|
|
|
- });
|
|
|
+ this._encodingQueue.enque(blob);
|
|
|
} else {
|
|
|
this._bufferedSend(blob);
|
|
|
}
|
|
@@ -263,16 +262,23 @@ export class DataConnection extends BaseConnection {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
+ if (this.dataChannel.bufferedAmount > DataConnection.MAX_BUFFERED_AMOUNT) {
|
|
|
+ this._buffering = true;
|
|
|
+ setTimeout(() => {
|
|
|
+ this._buffering = false;
|
|
|
+ this._tryBuffer();
|
|
|
+ }, 50);
|
|
|
+
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
try {
|
|
|
this.dataChannel.send(msg);
|
|
|
} catch (e) {
|
|
|
+ logger.error(`DC#:${this.connectionId} Error when sending:`, e);
|
|
|
this._buffering = true;
|
|
|
|
|
|
- setTimeout(() => {
|
|
|
- // Try again.
|
|
|
- this._buffering = false;
|
|
|
- this._tryBuffer();
|
|
|
- }, 100);
|
|
|
+ this.close();
|
|
|
|
|
|
return false;
|
|
|
}
|
|
@@ -299,8 +305,9 @@ export class DataConnection extends BaseConnection {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private _sendChunks(blob): void {
|
|
|
+ private _sendChunks(blob: Blob): void {
|
|
|
const blobs = util.chunk(blob);
|
|
|
+ logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`);
|
|
|
|
|
|
for (let blob of blobs) {
|
|
|
this.send(blob, true);
|