StreamConnection.ts 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import logger from "../../logger.js";
  2. import type { Peer } from "../../peer.js";
  3. import { DataConnection } from "../DataConnection.js";
  4. export abstract class StreamConnection extends DataConnection {
  5. private _CHUNK_SIZE = 1024 * 8 * 4;
  6. private _splitStream = new TransformStream<Uint8Array>({
  7. transform: (chunk, controller) => {
  8. for (let split = 0; split < chunk.length; split += this._CHUNK_SIZE) {
  9. controller.enqueue(chunk.subarray(split, split + this._CHUNK_SIZE));
  10. }
  11. },
  12. });
  13. private _rawSendStream = new WritableStream<ArrayBuffer>({
  14. write: async (chunk, controller) => {
  15. const openEvent = new Promise((resolve) =>
  16. this.dataChannel.addEventListener("bufferedamountlow", resolve, {
  17. once: true,
  18. }),
  19. );
  20. // if we can send the chunk now, send it
  21. // if not, we wait until at least half of the sending buffer is free again
  22. await (this.dataChannel.bufferedAmount <=
  23. DataConnection.MAX_BUFFERED_AMOUNT - chunk.byteLength || openEvent);
  24. // TODO: what can go wrong here?
  25. try {
  26. this.dataChannel.send(chunk);
  27. } catch (e) {
  28. logger.error(`DC#:${this.connectionId} Error when sending:`, e);
  29. controller.error(e);
  30. this.close();
  31. }
  32. },
  33. });
  34. protected writer = this._splitStream.writable.getWriter();
  35. protected _rawReadStream = new ReadableStream<ArrayBuffer>({
  36. start: (controller) => {
  37. this.once("open", () => {
  38. this.dataChannel.addEventListener("message", (e) => {
  39. controller.enqueue(e.data);
  40. });
  41. });
  42. },
  43. });
  44. protected constructor(peerId: string, provider: Peer, options: any) {
  45. super(peerId, provider, { ...options, reliable: true });
  46. void this._splitStream.readable.pipeTo(this._rawSendStream);
  47. }
  48. public override _initializeDataChannel(dc) {
  49. super._initializeDataChannel(dc);
  50. this.dataChannel.binaryType = "arraybuffer";
  51. this.dataChannel.bufferedAmountLowThreshold =
  52. DataConnection.MAX_BUFFERED_AMOUNT / 2;
  53. }
  54. }