BinaryPack.ts 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. import { BinaryPackChunker, concatArrayBuffers } from "./binaryPackChunker";
  2. import logger from "../../logger";
  3. import type { Peer } from "../../peer";
  4. import { BufferedConnection } from "./BufferedConnection";
  5. import { SerializationType } from "../../enums";
  6. import { pack, type Packable, unpack } from "peerjs-js-binarypack";
  7. export class BinaryPack extends BufferedConnection {
  8. private readonly chunker = new BinaryPackChunker();
  9. readonly serialization = SerializationType.Binary;
  10. private _chunkedData: {
  11. [id: number]: {
  12. data: Uint8Array[];
  13. count: number;
  14. total: number;
  15. };
  16. } = {};
  17. public override close(options?: { flush?: boolean }) {
  18. super.close(options);
  19. this._chunkedData = {};
  20. }
  21. constructor(peerId: string, provider: Peer, options: any) {
  22. super(peerId, provider, options);
  23. }
  24. // Handles a DataChannel message.
  25. protected override _handleDataMessage({ data }: { data: Uint8Array }): void {
  26. const deserializedData = unpack(data);
  27. // PeerJS specific message
  28. const peerData = deserializedData["__peerData"];
  29. if (peerData) {
  30. if (peerData.type === "close") {
  31. this.close();
  32. return;
  33. }
  34. // Chunked data -- piece things back together.
  35. // @ts-ignore
  36. this._handleChunk(deserializedData);
  37. return;
  38. }
  39. this.emit("data", deserializedData);
  40. }
  41. private _handleChunk(data: {
  42. __peerData: number;
  43. n: number;
  44. total: number;
  45. data: ArrayBuffer;
  46. }): void {
  47. const id = data.__peerData;
  48. const chunkInfo = this._chunkedData[id] || {
  49. data: [],
  50. count: 0,
  51. total: data.total,
  52. };
  53. chunkInfo.data[data.n] = new Uint8Array(data.data);
  54. chunkInfo.count++;
  55. this._chunkedData[id] = chunkInfo;
  56. if (chunkInfo.total === chunkInfo.count) {
  57. // Clean up before making the recursive call to `_handleDataMessage`.
  58. delete this._chunkedData[id];
  59. // We've received all the chunks--time to construct the complete data.
  60. // const data = new Blob(chunkInfo.data);
  61. const data = concatArrayBuffers(chunkInfo.data);
  62. this._handleDataMessage({ data });
  63. }
  64. }
  65. protected override _send(
  66. data: Packable,
  67. chunked: boolean,
  68. ): void | Promise<void> {
  69. if (data instanceof Blob) {
  70. return data.arrayBuffer().then((buffer) => {
  71. this._send(buffer, chunked);
  72. });
  73. }
  74. const blob = pack(data);
  75. if (!chunked && blob.byteLength > this.chunker.chunkedMTU) {
  76. this._sendChunks(blob);
  77. return;
  78. }
  79. this._bufferedSend(blob);
  80. }
  81. private _sendChunks(blob: ArrayBuffer) {
  82. const blobs = this.chunker.chunk(blob);
  83. logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`);
  84. for (const blob of blobs) {
  85. this.send(blob, true);
  86. }
  87. }
  88. }