BinaryJSConnection.ts 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import { concatArrayBuffers, util } from "../../util";
  2. import logger from "../../logger";
  3. import { Peer } from "../../peer";
  4. import { BufferedConnection } from "./BufferedConnection";
  5. import { SerializationType } from "../../enums";
  6. import { Packable, pack, unpack } from "peerjs-js-binarypack";
  7. export class BinaryJSConnection extends BufferedConnection {
  8. readonly serialization = SerializationType.Binary;
  9. private _chunkedData: {
  10. [id: number]: {
  11. data: Uint8Array[];
  12. count: number;
  13. total: number;
  14. };
  15. } = {};
  16. public override close(options?: { flush?: boolean }) {
  17. super.close(options);
  18. this._chunkedData = {};
  19. }
  20. constructor(peerId: string, provider: Peer, options: any) {
  21. super(peerId, provider, options);
  22. }
  23. // Handles a DataChannel message.
  24. protected override _handleDataMessage({ data }: { data: Uint8Array }): void {
  25. let deserializedData = unpack(data);
  26. // PeerJS specific message
  27. const peerData = deserializedData["__peerData"];
  28. if (peerData) {
  29. if (peerData.type === "close") {
  30. this.close();
  31. return;
  32. }
  33. // Chunked data -- piece things back together.
  34. // @ts-ignore
  35. this._handleChunk(deserializedData);
  36. return;
  37. }
  38. this.emit("data", deserializedData);
  39. }
  40. private _handleChunk(data: {
  41. __peerData: number;
  42. n: number;
  43. total: number;
  44. data: ArrayBuffer;
  45. }): void {
  46. const id = data.__peerData;
  47. const chunkInfo = this._chunkedData[id] || {
  48. data: [],
  49. count: 0,
  50. total: data.total,
  51. };
  52. chunkInfo.data[data.n] = new Uint8Array(data.data);
  53. chunkInfo.count++;
  54. this._chunkedData[id] = chunkInfo;
  55. if (chunkInfo.total === chunkInfo.count) {
  56. // Clean up before making the recursive call to `_handleDataMessage`.
  57. delete this._chunkedData[id];
  58. // We've received all the chunks--time to construct the complete data.
  59. // const data = new Blob(chunkInfo.data);
  60. const data = concatArrayBuffers(chunkInfo.data);
  61. this._handleDataMessage({ data });
  62. }
  63. }
  64. protected override _send(
  65. data: Packable,
  66. chunked: boolean,
  67. ): void | Promise<void> {
  68. const blob = pack(data);
  69. if (!chunked && blob.byteLength > util.chunkedMTU) {
  70. this._sendChunks(blob);
  71. return;
  72. }
  73. this._bufferedSend(blob);
  74. }
  75. private _sendChunks(blob: ArrayBuffer) {
  76. const blobs = util.chunk(blob);
  77. logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`);
  78. for (let blob of blobs) {
  79. this.send(blob, true);
  80. }
  81. }
  82. }