dataconnection.ts 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. import { util } from "./util";
  2. import logger from "./logger";
  3. import { Negotiator } from "./negotiator";
  4. import { ConnectionType, SerializationType, ServerMessageType } from "./enums";
  5. import { Peer } from "./peer";
  6. import { BaseConnection } from "./baseconnection";
  7. import { ServerMessage } from "./servermessage";
  8. import { EncodingQueue } from "./encodingQueue";
  9. import type { DataConnection as IDataConnection } from "./dataconnection";
  10. export type DataConnectionEvents = {
  11. /**
  12. * Emitted when data is received from the remote peer.
  13. *
  14. * ```ts
  15. * dataConnection.on('data', (data) => { ... });
  16. * ```
  17. */
  18. data: (data: unknown) => void;
  19. /**
  20. * Emitted when the connection is established and ready-to-use.
  21. *
  22. * ```ts
  23. * dataConnection.on('open', () => { ... });
  24. * ```
  25. */
  26. open: () => void;
  27. };
  28. /**
  29. * Wraps WebRTC's DataChannel.
  30. * To get one, use {@apilink Peer.connect} or listen for the {@apilink PeerEvents | `connect`} event.
  31. */
  32. export class DataConnection
  33. extends BaseConnection<DataConnectionEvents>
  34. implements IDataConnection
  35. {
  36. private static readonly ID_PREFIX = "dc_";
  37. private static readonly MAX_BUFFERED_AMOUNT = 8 * 1024 * 1024;
  38. private _negotiator: Negotiator<DataConnectionEvents, DataConnection>;
  39. /**
  40. * The optional label passed in or assigned by PeerJS when the connection was initiated.
  41. */
  42. readonly label: string;
  43. /**
  44. * The serialization format of the data sent over the connection.
  45. * {@apilink SerializationType | possible values}
  46. */
  47. readonly serialization: SerializationType;
  48. /**
  49. * Whether the underlying data channels are reliable; defined when the connection was initiated.
  50. */
  51. readonly reliable: boolean;
  52. stringify: (data: any) => string = JSON.stringify;
  53. parse: (data: string) => any = JSON.parse;
  54. get type() {
  55. return ConnectionType.Data;
  56. }
  57. private _buffer: any[] = [];
  58. /**
  59. * The number of messages queued to be sent once the browser buffer is no longer full.
  60. */
  61. private _bufferSize = 0;
  62. private _buffering = false;
  63. private _chunkedData: {
  64. [id: number]: {
  65. data: Blob[];
  66. count: number;
  67. total: number;
  68. };
  69. } = {};
  70. private _dc: RTCDataChannel;
  71. private _encodingQueue = new EncodingQueue();
  72. /**
  73. * A reference to the RTCDataChannel object associated with the connection.
  74. */
  75. get dataChannel(): RTCDataChannel {
  76. return this._dc;
  77. }
  78. get bufferSize(): number {
  79. return this._bufferSize;
  80. }
  81. constructor(peerId: string, provider: Peer, options: any) {
  82. super(peerId, provider, options);
  83. this.connectionId =
  84. this.options.connectionId ||
  85. DataConnection.ID_PREFIX + util.randomToken();
  86. this.label = this.options.label || this.connectionId;
  87. this.serialization = this.options.serialization || SerializationType.Binary;
  88. this.reliable = !!this.options.reliable;
  89. this._encodingQueue.on("done", (ab: ArrayBuffer) => {
  90. this._bufferedSend(ab);
  91. });
  92. this._encodingQueue.on("error", () => {
  93. logger.error(
  94. `DC#${this.connectionId}: Error occured in encoding from blob to arraybuffer, close DC`,
  95. );
  96. this.close();
  97. });
  98. this._negotiator = new Negotiator(this);
  99. this._negotiator.startConnection(
  100. this.options._payload || {
  101. originator: true,
  102. },
  103. );
  104. }
  105. /** Called by the Negotiator when the DataChannel is ready. */
  106. initialize(dc: RTCDataChannel): void {
  107. this._dc = dc;
  108. this._configureDataChannel();
  109. }
  110. private _configureDataChannel(): void {
  111. if (!util.supports.binaryBlob || util.supports.reliable) {
  112. this.dataChannel.binaryType = "arraybuffer";
  113. }
  114. this.dataChannel.onopen = () => {
  115. logger.log(`DC#${this.connectionId} dc connection success`);
  116. this._open = true;
  117. this.emit("open");
  118. };
  119. this.dataChannel.onmessage = (e) => {
  120. logger.log(`DC#${this.connectionId} dc onmessage:`, e.data);
  121. this._handleDataMessage(e);
  122. };
  123. this.dataChannel.onclose = () => {
  124. logger.log(`DC#${this.connectionId} dc closed for:`, this.peer);
  125. this.close();
  126. };
  127. }
  128. // Handles a DataChannel message.
  129. private _handleDataMessage({
  130. data,
  131. }: {
  132. data: Blob | ArrayBuffer | string;
  133. }): void {
  134. const datatype = data.constructor;
  135. const isBinarySerialization =
  136. this.serialization === SerializationType.Binary ||
  137. this.serialization === SerializationType.BinaryUTF8;
  138. let deserializedData: any = data;
  139. if (isBinarySerialization) {
  140. if (datatype === Blob) {
  141. // Datatype should never be blob
  142. util.blobToArrayBuffer(data as Blob, (ab) => {
  143. const unpackedData = util.unpack(ab);
  144. this.emit("data", unpackedData);
  145. });
  146. return;
  147. } else if (datatype === ArrayBuffer) {
  148. deserializedData = util.unpack(data as ArrayBuffer);
  149. } else if (datatype === String) {
  150. // String fallback for binary data for browsers that don't support binary yet
  151. const ab = util.binaryStringToArrayBuffer(data as string);
  152. deserializedData = util.unpack(ab);
  153. }
  154. } else if (this.serialization === SerializationType.JSON) {
  155. deserializedData = this.parse(data as string);
  156. }
  157. // Check if we've chunked--if so, piece things back together.
  158. // We're guaranteed that this isn't 0.
  159. if (deserializedData.__peerData) {
  160. this._handleChunk(deserializedData);
  161. return;
  162. }
  163. super.emit("data", deserializedData);
  164. }
  165. private _handleChunk(data: {
  166. __peerData: number;
  167. n: number;
  168. total: number;
  169. data: Blob;
  170. }): void {
  171. const id = data.__peerData;
  172. const chunkInfo = this._chunkedData[id] || {
  173. data: [],
  174. count: 0,
  175. total: data.total,
  176. };
  177. chunkInfo.data[data.n] = data.data;
  178. chunkInfo.count++;
  179. this._chunkedData[id] = chunkInfo;
  180. if (chunkInfo.total === chunkInfo.count) {
  181. // Clean up before making the recursive call to `_handleDataMessage`.
  182. delete this._chunkedData[id];
  183. // We've received all the chunks--time to construct the complete data.
  184. const data = new Blob(chunkInfo.data);
  185. this._handleDataMessage({ data });
  186. }
  187. }
  188. /**
  189. * Exposed functionality for users.
  190. */
  191. /** Allows user to close connection. */
  192. close(): void {
  193. this._buffer = [];
  194. this._bufferSize = 0;
  195. this._chunkedData = {};
  196. if (this._negotiator) {
  197. this._negotiator.cleanup();
  198. this._negotiator = null;
  199. }
  200. if (this.provider) {
  201. this.provider._removeConnection(this);
  202. this.provider = null;
  203. }
  204. if (this.dataChannel) {
  205. this.dataChannel.onopen = null;
  206. this.dataChannel.onmessage = null;
  207. this.dataChannel.onclose = null;
  208. this._dc = null;
  209. }
  210. if (this._encodingQueue) {
  211. this._encodingQueue.destroy();
  212. this._encodingQueue.removeAllListeners();
  213. this._encodingQueue = null;
  214. }
  215. if (!this.open) {
  216. return;
  217. }
  218. this._open = false;
  219. super.emit("close");
  220. }
  221. /**
  222. * `data` is serialized and sent to the remote peer.
  223. * @param data You can send any type of data, including objects, strings, and blobs.
  224. * @returns
  225. */
  226. send(data: any, chunked?: boolean): void {
  227. if (!this.open) {
  228. super.emit(
  229. "error",
  230. new Error(
  231. "Connection is not open. You should listen for the `open` event before sending messages.",
  232. ),
  233. );
  234. return;
  235. }
  236. if (this.serialization === SerializationType.JSON) {
  237. this._bufferedSend(this.stringify(data));
  238. } else if (
  239. this.serialization === SerializationType.Binary ||
  240. this.serialization === SerializationType.BinaryUTF8
  241. ) {
  242. const blob = util.pack(data);
  243. if (!chunked && blob.size > util.chunkedMTU) {
  244. this._sendChunks(blob);
  245. return;
  246. }
  247. if (!util.supports.binaryBlob) {
  248. // We only do this if we really need to (e.g. blobs are not supported),
  249. // because this conversion is costly.
  250. this._encodingQueue.enque(blob);
  251. } else {
  252. this._bufferedSend(blob);
  253. }
  254. } else {
  255. this._bufferedSend(data);
  256. }
  257. }
  258. private _bufferedSend(msg: any): void {
  259. if (this._buffering || !this._trySend(msg)) {
  260. this._buffer.push(msg);
  261. this._bufferSize = this._buffer.length;
  262. }
  263. }
  264. // Returns true if the send succeeds.
  265. private _trySend(msg: any): boolean {
  266. if (!this.open) {
  267. return false;
  268. }
  269. if (this.dataChannel.bufferedAmount > DataConnection.MAX_BUFFERED_AMOUNT) {
  270. this._buffering = true;
  271. setTimeout(() => {
  272. this._buffering = false;
  273. this._tryBuffer();
  274. }, 50);
  275. return false;
  276. }
  277. try {
  278. this.dataChannel.send(msg);
  279. } catch (e) {
  280. logger.error(`DC#:${this.connectionId} Error when sending:`, e);
  281. this._buffering = true;
  282. this.close();
  283. return false;
  284. }
  285. return true;
  286. }
  287. // Try to send the first message in the buffer.
  288. private _tryBuffer(): void {
  289. if (!this.open) {
  290. return;
  291. }
  292. if (this._buffer.length === 0) {
  293. return;
  294. }
  295. const msg = this._buffer[0];
  296. if (this._trySend(msg)) {
  297. this._buffer.shift();
  298. this._bufferSize = this._buffer.length;
  299. this._tryBuffer();
  300. }
  301. }
  302. private _sendChunks(blob: Blob): void {
  303. const blobs = util.chunk(blob);
  304. logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`);
  305. for (const blob of blobs) {
  306. this.send(blob, true);
  307. }
  308. }
  309. handleMessage(message: ServerMessage): void {
  310. const payload = message.payload;
  311. switch (message.type) {
  312. case ServerMessageType.Answer:
  313. this._negotiator.handleSDP(message.type, payload.sdp);
  314. break;
  315. case ServerMessageType.Candidate:
  316. this._negotiator.handleCandidate(payload.candidate);
  317. break;
  318. default:
  319. logger.warn(
  320. "Unrecognized message type:",
  321. message.type,
  322. "from peer:",
  323. this.peer,
  324. );
  325. break;
  326. }
  327. }
  328. }