dataconnection.ts 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. import { util } from "./util";
  2. import logger from "./logger";
  3. import { Negotiator } from "./negotiator";
  4. import { ConnectionType, SerializationType, ServerMessageType } from "./enums";
  5. import { Peer } from "./peer";
  6. import { BaseConnection } from "./baseconnection";
  7. import { ServerMessage } from "./servermessage";
  8. import { EncodingQueue } from "./encodingQueue";
  9. import type { DataConnection as IDataConnection } from "./dataconnection";
  10. type DataConnectionEvents = {
  11. /**
  12. * Emitted when data is received from the remote peer.
  13. */
  14. data: (data: unknown) => void;
  15. /**
  16. * Emitted when the connection is established and ready-to-use.
  17. */
  18. open: () => void;
  19. };
  20. /**
  21. * Wraps a DataChannel between two Peers.
  22. */
  23. export class DataConnection
  24. extends BaseConnection<DataConnectionEvents>
  25. implements IDataConnection
  26. {
  27. private static readonly ID_PREFIX = "dc_";
  28. private static readonly MAX_BUFFERED_AMOUNT = 8 * 1024 * 1024;
  29. private _negotiator: Negotiator<DataConnectionEvents, DataConnection>;
  30. readonly label: string;
  31. readonly serialization: SerializationType;
  32. readonly reliable: boolean;
  33. stringify: (data: any) => string = JSON.stringify;
  34. parse: (data: string) => any = JSON.parse;
  35. get type() {
  36. return ConnectionType.Data;
  37. }
  38. private _buffer: any[] = [];
  39. private _bufferSize = 0;
  40. private _buffering = false;
  41. private _chunkedData: {
  42. [id: number]: {
  43. data: Blob[];
  44. count: number;
  45. total: number;
  46. };
  47. } = {};
  48. private _dc: RTCDataChannel;
  49. private _encodingQueue = new EncodingQueue();
  50. get dataChannel(): RTCDataChannel {
  51. return this._dc;
  52. }
  53. get bufferSize(): number {
  54. return this._bufferSize;
  55. }
  56. constructor(peerId: string, provider: Peer, options: any) {
  57. super(peerId, provider, options);
  58. this.connectionId =
  59. this.options.connectionId ||
  60. DataConnection.ID_PREFIX + util.randomToken();
  61. this.label = this.options.label || this.connectionId;
  62. this.serialization = this.options.serialization || SerializationType.Binary;
  63. this.reliable = !!this.options.reliable;
  64. this._encodingQueue.on("done", (ab: ArrayBuffer) => {
  65. this._bufferedSend(ab);
  66. });
  67. this._encodingQueue.on("error", () => {
  68. logger.error(
  69. `DC#${this.connectionId}: Error occured in encoding from blob to arraybuffer, close DC`,
  70. );
  71. this.close();
  72. });
  73. this._negotiator = new Negotiator(this);
  74. this._negotiator.startConnection(
  75. this.options._payload || {
  76. originator: true,
  77. },
  78. );
  79. }
  80. /** Called by the Negotiator when the DataChannel is ready. */
  81. initialize(dc: RTCDataChannel): void {
  82. this._dc = dc;
  83. this._configureDataChannel();
  84. }
  85. private _configureDataChannel(): void {
  86. if (!util.supports.binaryBlob || util.supports.reliable) {
  87. this.dataChannel.binaryType = "arraybuffer";
  88. }
  89. this.dataChannel.onopen = () => {
  90. logger.log(`DC#${this.connectionId} dc connection success`);
  91. this._open = true;
  92. this.emit("open");
  93. };
  94. this.dataChannel.onmessage = (e) => {
  95. logger.log(`DC#${this.connectionId} dc onmessage:`, e.data);
  96. this._handleDataMessage(e);
  97. };
  98. this.dataChannel.onclose = () => {
  99. logger.log(`DC#${this.connectionId} dc closed for:`, this.peer);
  100. this.close();
  101. };
  102. }
  103. // Handles a DataChannel message.
  104. private _handleDataMessage({
  105. data,
  106. }: {
  107. data: Blob | ArrayBuffer | string;
  108. }): void {
  109. const datatype = data.constructor;
  110. const isBinarySerialization =
  111. this.serialization === SerializationType.Binary ||
  112. this.serialization === SerializationType.BinaryUTF8;
  113. let deserializedData: any = data;
  114. if (isBinarySerialization) {
  115. if (datatype === Blob) {
  116. // Datatype should never be blob
  117. util.blobToArrayBuffer(data as Blob, (ab) => {
  118. const unpackedData = util.unpack(ab);
  119. this.emit("data", unpackedData);
  120. });
  121. return;
  122. } else if (datatype === ArrayBuffer) {
  123. deserializedData = util.unpack(data as ArrayBuffer);
  124. } else if (datatype === String) {
  125. // String fallback for binary data for browsers that don't support binary yet
  126. const ab = util.binaryStringToArrayBuffer(data as string);
  127. deserializedData = util.unpack(ab);
  128. }
  129. } else if (this.serialization === SerializationType.JSON) {
  130. deserializedData = this.parse(data as string);
  131. }
  132. // Check if we've chunked--if so, piece things back together.
  133. // We're guaranteed that this isn't 0.
  134. if (deserializedData.__peerData) {
  135. this._handleChunk(deserializedData);
  136. return;
  137. }
  138. super.emit("data", deserializedData);
  139. }
  140. private _handleChunk(data: {
  141. __peerData: number;
  142. n: number;
  143. total: number;
  144. data: Blob;
  145. }): void {
  146. const id = data.__peerData;
  147. const chunkInfo = this._chunkedData[id] || {
  148. data: [],
  149. count: 0,
  150. total: data.total,
  151. };
  152. chunkInfo.data[data.n] = data.data;
  153. chunkInfo.count++;
  154. this._chunkedData[id] = chunkInfo;
  155. if (chunkInfo.total === chunkInfo.count) {
  156. // Clean up before making the recursive call to `_handleDataMessage`.
  157. delete this._chunkedData[id];
  158. // We've received all the chunks--time to construct the complete data.
  159. const data = new Blob(chunkInfo.data);
  160. this._handleDataMessage({ data });
  161. }
  162. }
  163. /**
  164. * Exposed functionality for users.
  165. */
  166. /** Allows user to close connection. */
  167. close(): void {
  168. this._buffer = [];
  169. this._bufferSize = 0;
  170. this._chunkedData = {};
  171. if (this._negotiator) {
  172. this._negotiator.cleanup();
  173. this._negotiator = null;
  174. }
  175. if (this.provider) {
  176. this.provider._removeConnection(this);
  177. this.provider = null;
  178. }
  179. if (this.dataChannel) {
  180. this.dataChannel.onopen = null;
  181. this.dataChannel.onmessage = null;
  182. this.dataChannel.onclose = null;
  183. this._dc = null;
  184. }
  185. if (this._encodingQueue) {
  186. this._encodingQueue.destroy();
  187. this._encodingQueue.removeAllListeners();
  188. this._encodingQueue = null;
  189. }
  190. if (!this.open) {
  191. return;
  192. }
  193. this._open = false;
  194. super.emit("close");
  195. }
  196. /** Allows user to send data. */
  197. send(data: any, chunked?: boolean): void {
  198. if (!this.open) {
  199. super.emit(
  200. "error",
  201. new Error(
  202. "Connection is not open. You should listen for the `open` event before sending messages.",
  203. ),
  204. );
  205. return;
  206. }
  207. if (this.serialization === SerializationType.JSON) {
  208. this._bufferedSend(this.stringify(data));
  209. } else if (
  210. this.serialization === SerializationType.Binary ||
  211. this.serialization === SerializationType.BinaryUTF8
  212. ) {
  213. const blob = util.pack(data);
  214. if (!chunked && blob.size > util.chunkedMTU) {
  215. this._sendChunks(blob);
  216. return;
  217. }
  218. if (!util.supports.binaryBlob) {
  219. // We only do this if we really need to (e.g. blobs are not supported),
  220. // because this conversion is costly.
  221. this._encodingQueue.enque(blob);
  222. } else {
  223. this._bufferedSend(blob);
  224. }
  225. } else {
  226. this._bufferedSend(data);
  227. }
  228. }
  229. private _bufferedSend(msg: any): void {
  230. if (this._buffering || !this._trySend(msg)) {
  231. this._buffer.push(msg);
  232. this._bufferSize = this._buffer.length;
  233. }
  234. }
  235. // Returns true if the send succeeds.
  236. private _trySend(msg: any): boolean {
  237. if (!this.open) {
  238. return false;
  239. }
  240. if (this.dataChannel.bufferedAmount > DataConnection.MAX_BUFFERED_AMOUNT) {
  241. this._buffering = true;
  242. setTimeout(() => {
  243. this._buffering = false;
  244. this._tryBuffer();
  245. }, 50);
  246. return false;
  247. }
  248. try {
  249. this.dataChannel.send(msg);
  250. } catch (e) {
  251. logger.error(`DC#:${this.connectionId} Error when sending:`, e);
  252. this._buffering = true;
  253. this.close();
  254. return false;
  255. }
  256. return true;
  257. }
  258. // Try to send the first message in the buffer.
  259. private _tryBuffer(): void {
  260. if (!this.open) {
  261. return;
  262. }
  263. if (this._buffer.length === 0) {
  264. return;
  265. }
  266. const msg = this._buffer[0];
  267. if (this._trySend(msg)) {
  268. this._buffer.shift();
  269. this._bufferSize = this._buffer.length;
  270. this._tryBuffer();
  271. }
  272. }
  273. private _sendChunks(blob: Blob): void {
  274. const blobs = util.chunk(blob);
  275. logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`);
  276. for (let blob of blobs) {
  277. this.send(blob, true);
  278. }
  279. }
  280. handleMessage(message: ServerMessage): void {
  281. const payload = message.payload;
  282. switch (message.type) {
  283. case ServerMessageType.Answer:
  284. this._negotiator.handleSDP(message.type, payload.sdp);
  285. break;
  286. case ServerMessageType.Candidate:
  287. this._negotiator.handleCandidate(payload.candidate);
  288. break;
  289. default:
  290. logger.warn(
  291. "Unrecognized message type:",
  292. message.type,
  293. "from peer:",
  294. this.peer,
  295. );
  296. break;
  297. }
  298. }
  299. }