BufferedConnection.ts 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import logger from "../../logger";
  2. import { DataConnection } from "../DataConnection";
  3. export abstract class BufferedConnection extends DataConnection {
  4. private _buffer: any[] = [];
  5. private _bufferSize = 0;
  6. private _buffering = false;
  7. public get bufferSize(): number {
  8. return this._bufferSize;
  9. }
  10. public override _initializeDataChannel(dc: RTCDataChannel) {
  11. super._initializeDataChannel(dc);
  12. this.dataChannel.binaryType = "arraybuffer";
  13. this.dataChannel.addEventListener("message", (e) =>
  14. this._handleDataMessage(e),
  15. );
  16. }
  17. protected abstract _handleDataMessage(e: MessageEvent): void;
  18. protected _bufferedSend(msg: ArrayBuffer): void {
  19. if (this._buffering || !this._trySend(msg)) {
  20. this._buffer.push(msg);
  21. this._bufferSize = this._buffer.length;
  22. }
  23. }
  24. // Returns true if the send succeeds.
  25. private _trySend(msg: ArrayBuffer): boolean {
  26. if (!this.open) {
  27. return false;
  28. }
  29. if (this.dataChannel.bufferedAmount > DataConnection.MAX_BUFFERED_AMOUNT) {
  30. this._buffering = true;
  31. setTimeout(() => {
  32. this._buffering = false;
  33. this._tryBuffer();
  34. }, 50);
  35. return false;
  36. }
  37. try {
  38. this.dataChannel.send(msg);
  39. } catch (e) {
  40. logger.error(`DC#:${this.connectionId} Error when sending:`, e);
  41. this._buffering = true;
  42. this.close();
  43. return false;
  44. }
  45. return true;
  46. }
  47. // Try to send the first message in the buffer.
  48. private _tryBuffer(): void {
  49. if (!this.open) {
  50. return;
  51. }
  52. if (this._buffer.length === 0) {
  53. return;
  54. }
  55. const msg = this._buffer[0];
  56. if (this._trySend(msg)) {
  57. this._buffer.shift();
  58. this._bufferSize = this._buffer.length;
  59. this._tryBuffer();
  60. }
  61. }
  62. public override close(options?: { flush?: boolean }) {
  63. if (options?.flush) {
  64. this.send({
  65. __peerData: {
  66. type: "close",
  67. },
  68. });
  69. return;
  70. }
  71. this._buffer = [];
  72. this._bufferSize = 0;
  73. super.close();
  74. }
  75. }