import { util } from "./util"; import Negotiator from "./negotiator"; import { Reliable } from "reliable"; import { ConnectionType, ConnectionEventType, SerializationType } from "./enums"; import { Peer } from "./peer"; import { BaseConnection } from "./baseconnection"; import { ServerMessage } from "./servermessage"; /** * Wraps a DataChannel between two Peers. */ export class DataConnection extends BaseConnection { private static readonly ID_PREFIX = "dc_"; readonly label: string; readonly serialization: SerializationType; readonly reliable: boolean; get type() { return ConnectionType.Data; } private _buffer: any[] = []; private _bufferSize = 0; private _buffering = false; private _chunkedData = {}; private _peerBrowser: any; private _dc: RTCDataChannel; private _reliable: Reliable; get dataChannel() { return this._dc; } constructor(peerId: string, provider: Peer, options: any) { super(peerId, provider, options); this.connectionId = options.connectionId || DataConnection.ID_PREFIX + util.randomToken(); this.label = options.label || this.connectionId; this.serialization = options.serialization; this.reliable = options.reliable; if (options._payload) { this._peerBrowser = options._payload.browser; } Negotiator.startConnection( this, options._payload || { originator: true } ); } /** Called by the Negotiator when the DataChannel is ready. */ initialize(dc: RTCDataChannel): void { this._dc = dc; this._configureDataChannel(); } private _configureDataChannel(): void { if (util.supports.sctp) { this.dataChannel.binaryType = "arraybuffer"; } const self = this; this.dataChannel.onopen = function () { util.log("Data channel connection success"); self._open = true; self.emit(ConnectionEventType.Open); }; // Use the Reliable shim for non Firefox browsers if (!util.supports.sctp && this.reliable) { this._reliable = new Reliable(this.dataChannel, util.debug); } if (this._reliable) { this._reliable.onmessage = function (msg) { self.emit(ConnectionEventType.Data, msg); }; } else { this.dataChannel.onmessage = function (e) { self._handleDataMessage(e); }; } this.dataChannel.onclose = function (e) { util.log("DataChannel closed for:", self.peer); self.close(); }; } // Handles a DataChannel message. private _handleDataMessage(e): void { let data = e.data; const datatype = data.constructor; if ( this.serialization === SerializationType.Binary || this.serialization === SerializationType.BinaryUTF8 ) { if (datatype === Blob) { const self = this; // Datatype should never be blob util.blobToArrayBuffer(data, function (ab) { data = util.unpack(ab); self.emit(ConnectionEventType.Data, data); }); return; } else if (datatype === ArrayBuffer) { data = util.unpack(data); } else if (datatype === String) { // String fallback for binary data for browsers that don't support binary yet const ab = util.binaryStringToArrayBuffer(data); data = util.unpack(ab); } } else if (this.serialization === SerializationType.JSON) { data = JSON.parse(data); } // Check if we've chunked--if so, piece things back together. // We're guaranteed that this isn't 0. if (data.__peerData) { const id = data.__peerData; const chunkInfo = this._chunkedData[id] || { data: [], count: 0, total: data.total }; chunkInfo.data[data.n] = data.data; chunkInfo.count++; if (chunkInfo.total === chunkInfo.count) { // Clean up before making the recursive call to `_handleDataMessage`. delete this._chunkedData[id]; // We've received all the chunks--time to construct the complete data. data = new Blob(chunkInfo.data); this._handleDataMessage({ data: data }); } this._chunkedData[id] = chunkInfo; return; } super.emit(ConnectionEventType.Data, data); } /** * Exposed functionality for users. */ /** Allows user to close connection. */ close(): void { if (!this.open) { return; } this._open = false; Negotiator.cleanup(this); super.emit(ConnectionEventType.Close); } /** Allows user to send data. */ send(data: any, chunked: boolean): void { if (!this.open) { super.emit( ConnectionEventType.Error, new Error( "Connection is not open. You should listen for the `open` event before sending messages." ) ); return; } if (this._reliable) { // Note: reliable shim sending will make it so that you cannot customize // serialization. this._reliable.send(data); return; } if (this.serialization === SerializationType.JSON) { this._bufferedSend(JSON.stringify(data)); } else if ( this.serialization === SerializationType.Binary || this.serialization === SerializationType.BinaryUTF8 ) { const blob = util.pack(data); // For Chrome-Firefox interoperability, we need to make Firefox "chunk" // the data it sends out. const needsChunking = util.chunkedBrowsers[this._peerBrowser] || util.chunkedBrowsers[util.browser]; if (needsChunking && !chunked && blob.size > util.chunkedMTU) { this._sendChunks(blob); return; } const self = this; // DataChannel currently only supports strings. if (!util.supports.sctp) { util.blobToBinaryString(blob, function (str) { self._bufferedSend(str); }); } else if (!util.supports.binaryBlob) { // We only do this if we really need to (e.g. blobs are not supported), // because this conversion is costly. util.blobToArrayBuffer(blob, function (ab) { self._bufferedSend(ab); }); } else { this._bufferedSend(blob); } } else { this._bufferedSend(data); } } private _bufferedSend(msg): void { if (this._buffering || !this._trySend(msg)) { this._buffer.push(msg); this._bufferSize = this._buffer.length; } } // Returns true if the send succeeds. private _trySend(msg): boolean { try { this.dataChannel.send(msg); } catch (e) { this._buffering = true; const self = this; setTimeout(function () { // Try again. self._buffering = false; self._tryBuffer(); }, 100); return false; } return true; } // Try to send the first message in the buffer. private _tryBuffer(): void { if (this._buffer.length === 0) { return; } const msg = this._buffer[0]; if (this._trySend(msg)) { this._buffer.shift(); this._bufferSize = this._buffer.length; this._tryBuffer(); } } private _sendChunks(blob): void { const blobs = util.chunk(blob); for (let blob of blobs) { this.send(blob, true); } } handleMessage(message: ServerMessage): void { const payload = message.payload; switch (message.type) { case "ANSWER": this._peerBrowser = payload.browser; // Forward to negotiator Negotiator.handleSDP(message.type, this, payload.sdp); break; case "CANDIDATE": Negotiator.handleCandidate(this, payload.candidate); break; default: util.warn( "Unrecognized message type:", message.type, "from peer:", this.peer ); break; } } }