DataConnection.ts 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. import logger from "../logger";
  2. import { Negotiator } from "../negotiator";
  3. import {
  4. BaseConnectionErrorType,
  5. ConnectionType,
  6. DataConnectionErrorType,
  7. ServerMessageType,
  8. } from "../enums";
  9. import type { Peer } from "../peer";
  10. import {
  11. BaseConnection,
  12. type BaseConnectionEvents,
  13. IBaseConnection,
  14. } from "../baseconnection";
  15. import type { ServerMessage } from "../servermessage";
  16. import { randomToken } from "../utils/randomToken";
  17. export interface DataConnectionEvents
  18. extends BaseConnectionEvents<
  19. DataConnectionErrorType | BaseConnectionErrorType
  20. > {
  21. /**
  22. * Emitted when data is received from the remote peer.
  23. */
  24. data: (data: unknown) => void;
  25. /**
  26. * Emitted when the connection is established and ready-to-use.
  27. */
  28. open: () => void;
  29. }
  30. export interface IDataConnection
  31. extends IBaseConnection<DataConnectionEvents, DataConnectionErrorType> {
  32. get type(): ConnectionType.Data;
  33. /** Allows user to close connection. */
  34. close(options?: { flush?: boolean }): void;
  35. /** Allows user to send data. */
  36. send(data: any, chunked?: boolean): void;
  37. }
  38. /**
  39. * Wraps a DataChannel between two Peers.
  40. */
  41. export abstract class DataConnection extends BaseConnection<
  42. IDataConnection,
  43. DataConnectionEvents,
  44. DataConnectionErrorType
  45. > {
  46. protected static readonly ID_PREFIX = "dc_";
  47. protected static readonly MAX_BUFFERED_AMOUNT = 8 * 1024 * 1024;
  48. private _negotiator: Negotiator<
  49. DataConnectionEvents,
  50. DataConnectionErrorType,
  51. this
  52. >;
  53. abstract readonly serialization: string;
  54. readonly reliable: boolean;
  55. public get type(): ConnectionType.Data {
  56. return ConnectionType.Data;
  57. }
  58. constructor(peerId: string, provider: Peer, options: any) {
  59. super(peerId, provider, options);
  60. this.connectionId =
  61. this.options.connectionId || DataConnection.ID_PREFIX + randomToken();
  62. this.label = this.options.label || this.connectionId;
  63. this.reliable = !!this.options.reliable;
  64. this._negotiator = new Negotiator(this);
  65. this._negotiator.startConnection(
  66. this.options._payload || {
  67. originator: true,
  68. reliable: this.reliable,
  69. },
  70. );
  71. }
  72. /** Called by the Negotiator when the DataChannel is ready. */
  73. override _initializeDataChannel(dc: RTCDataChannel): void {
  74. this.dataChannel = dc;
  75. this.dataChannel.onopen = () => {
  76. logger.log(`DC#${this.connectionId} dc connection success`);
  77. this._open = true;
  78. this.emit("open");
  79. };
  80. this.dataChannel.onmessage = (e) => {
  81. logger.log(`DC#${this.connectionId} dc onmessage:`, e.data);
  82. // this._handleDataMessage(e);
  83. };
  84. this.dataChannel.onclose = () => {
  85. logger.log(`DC#${this.connectionId} dc closed for:`, this.peer);
  86. this.close();
  87. };
  88. }
  89. /**
  90. * Exposed functionality for users.
  91. */
  92. close(options?: { flush?: boolean }): void {
  93. if (options?.flush) {
  94. this.send({
  95. __peerData: {
  96. type: "close",
  97. },
  98. });
  99. return;
  100. }
  101. if (this._negotiator) {
  102. this._negotiator.cleanup();
  103. this._negotiator = null;
  104. }
  105. if (this.provider) {
  106. this.provider._removeConnection(this);
  107. this.provider = null;
  108. }
  109. if (this.dataChannel) {
  110. this.dataChannel.onopen = null;
  111. this.dataChannel.onmessage = null;
  112. this.dataChannel.onclose = null;
  113. this.dataChannel = null;
  114. }
  115. if (!this.open) {
  116. return;
  117. }
  118. this._open = false;
  119. super.emit("close");
  120. }
  121. protected abstract _send(data: any, chunked: boolean): void;
  122. public send(data: any, chunked = false) {
  123. if (!this.open) {
  124. this.emitError(
  125. DataConnectionErrorType.NotOpenYet,
  126. "Connection is not open. You should listen for the `open` event before sending messages.",
  127. );
  128. return;
  129. }
  130. return this._send(data, chunked);
  131. }
  132. async handleMessage(message: ServerMessage) {
  133. const payload = message.payload;
  134. switch (message.type) {
  135. case ServerMessageType.Answer:
  136. await this._negotiator.handleSDP(message.type, payload.sdp);
  137. break;
  138. case ServerMessageType.Candidate:
  139. await this._negotiator.handleCandidate(payload.candidate);
  140. break;
  141. default:
  142. logger.warn(
  143. "Unrecognized message type:",
  144. message.type,
  145. "from peer:",
  146. this.peer,
  147. );
  148. break;
  149. }
  150. }
  151. }