socket.ts 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. import { EventEmitter } from "eventemitter3";
  2. import logger from "./logger";
  3. import { ServerMessageType, SocketEventType } from "./enums";
  4. import { version } from "../package.json";
  5. import { IncomingServerMessage, OutgoingServerMessage } from "./serverMessages";
  6. interface SocketEvents {
  7. [SocketEventType.Disconnected]: () => void;
  8. [SocketEventType.Error]: (error: "Invalid message") => void;
  9. [SocketEventType.Message]: (message: IncomingServerMessage) => void;
  10. [SocketEventType.Close]: () => void;
  11. }
  12. /**
  13. * An abstraction on top of WebSockets to provide fastest
  14. * possible connection for peers.
  15. */
  16. export class Socket extends EventEmitter<SocketEvents, never> {
  17. private _disconnected: boolean = true;
  18. private _id?: string;
  19. private _messagesQueue: Array<OutgoingServerMessage> = [];
  20. private _socket?: WebSocket;
  21. private _wsPingTimer?: any;
  22. private readonly _baseUrl: string;
  23. constructor(
  24. secure: any,
  25. host: string,
  26. port: number,
  27. path: string,
  28. key: string,
  29. private readonly pingInterval: number = 5000,
  30. ) {
  31. super();
  32. const wsProtocol = secure ? "wss://" : "ws://";
  33. this._baseUrl = wsProtocol + host + ":" + port + path + "peerjs?key=" + key;
  34. }
  35. start(id: string, token: string): void {
  36. this._id = id;
  37. const wsUrl = `${this._baseUrl}&id=${id}&token=${token}`;
  38. if (!!this._socket || !this._disconnected) {
  39. return;
  40. }
  41. this._socket = new WebSocket(wsUrl + "&version=" + version);
  42. this._disconnected = false;
  43. this._socket.onmessage = (event) => {
  44. let data;
  45. try {
  46. data = JSON.parse(event.data);
  47. logger.log("Server message received:", data);
  48. } catch (e) {
  49. logger.log("Invalid server message", event.data);
  50. return;
  51. }
  52. this.emit(SocketEventType.Message, data);
  53. };
  54. this._socket.onclose = (event) => {
  55. if (this._disconnected) {
  56. return;
  57. }
  58. logger.log("Socket closed.", event);
  59. this._cleanup();
  60. this._disconnected = true;
  61. this.emit(SocketEventType.Disconnected);
  62. };
  63. // Take care of the queue of connections if necessary and make sure Peer knows
  64. // socket is open.
  65. this._socket.onopen = () => {
  66. if (this._disconnected) {
  67. return;
  68. }
  69. this._sendQueuedMessages();
  70. logger.log("Socket open");
  71. this._scheduleHeartbeat();
  72. };
  73. }
  74. private _scheduleHeartbeat(): void {
  75. this._wsPingTimer = setTimeout(() => {
  76. this._sendHeartbeat();
  77. }, this.pingInterval);
  78. }
  79. private _sendHeartbeat(): void {
  80. if (!this._wsOpen()) {
  81. logger.log(`Cannot send heartbeat, because socket closed`);
  82. return;
  83. }
  84. const message = JSON.stringify({ type: ServerMessageType.Heartbeat });
  85. this._socket!.send(message);
  86. this._scheduleHeartbeat();
  87. }
  88. /** Is the websocket currently open? */
  89. private _wsOpen(): boolean {
  90. return !!this._socket && this._socket.readyState === 1;
  91. }
  92. /** Send queued messages. */
  93. private _sendQueuedMessages(): void {
  94. //Create copy of queue and clear it,
  95. //because send method push the message back to queue if smth will go wrong
  96. const copiedQueue = [...this._messagesQueue];
  97. this._messagesQueue = [];
  98. for (const message of copiedQueue) {
  99. this.send(message);
  100. }
  101. }
  102. /** Exposed send for DC & Peer. */
  103. send(data: OutgoingServerMessage): void {
  104. if (this._disconnected) {
  105. return;
  106. }
  107. // If we didn't get an ID yet, we can't yet send anything so we should queue
  108. // up these messages.
  109. if (!this._id) {
  110. this._messagesQueue.push(data);
  111. return;
  112. }
  113. if (!data.type) {
  114. this.emit(SocketEventType.Error, "Invalid message");
  115. return;
  116. }
  117. if (!this._wsOpen()) {
  118. return;
  119. }
  120. const message = JSON.stringify(data);
  121. this._socket!.send(message);
  122. }
  123. close(): void {
  124. if (this._disconnected) {
  125. return;
  126. }
  127. this._cleanup();
  128. this._disconnected = true;
  129. }
  130. private _cleanup(): void {
  131. if (this._socket) {
  132. this._socket.onopen =
  133. this._socket.onmessage =
  134. this._socket.onclose =
  135. null;
  136. this._socket.close();
  137. this._socket = undefined;
  138. }
  139. clearTimeout(this._wsPingTimer!);
  140. }
  141. }