WebSocketController.js 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. const WebSocket = require ('ws');
  2. const WorkerState = require('../core/WorkerState');//singleton
  3. const utils = require('../core/utils');
  4. const cleanPeriod = 1*60*1000;//1 минута
  5. const closeSocketOnIdle = 5*60*1000;//5 минут
  6. class WebSocketController {
  7. constructor(wss, config) {
  8. this.config = config;
  9. this.workerState = new WorkerState();
  10. this.wss = wss;
  11. wss.on('connection', (ws) => {
  12. ws.on('message', (message) => {
  13. this.onMessage(ws, message);
  14. });
  15. });
  16. setTimeout(() => { this.periodicClean(); }, cleanPeriod);
  17. }
  18. periodicClean() {
  19. try {
  20. const now = Date.now();
  21. this.wss.clients.forEach((ws) => {
  22. if (!ws.lastActivity || now - ws.lastActivity > closeSocketOnIdle - 50) {
  23. ws.terminate();
  24. }
  25. });
  26. } finally {
  27. setTimeout(() => { this.periodicClean(); }, cleanPeriod);
  28. }
  29. }
  30. async onMessage(ws, message) {
  31. let req = {};
  32. try {
  33. ws.lastActivity = Date.now();
  34. req = JSON.parse(message);
  35. switch (req.action) {
  36. case 'test':
  37. this.test(req, ws); break;
  38. case 'worker-get-state':
  39. this.workerGetState(req, ws); break;
  40. case 'worker-get-state-finish':
  41. this.workerGetStateFinish(req, ws); break;
  42. default:
  43. throw new Error(`Action not found: ${req.action}`);
  44. }
  45. } catch (e) {
  46. this.send({error: e.message}, req, ws);
  47. }
  48. }
  49. send(res, req, ws) {
  50. if (ws.readyState == WebSocket.OPEN) {
  51. ws.lastActivity = Date.now();
  52. let r = Object.assign({}, res);
  53. if (req.requestId)
  54. r.requestId = req.requestId;
  55. ws.send(JSON.stringify(r));
  56. }
  57. }
  58. //Actions ------------------------------------------------------------------
  59. async test(req, ws) {
  60. this.send({message: 'Liberama project is awesome'}, req, ws);
  61. }
  62. async workerGetState(req, ws) {
  63. if (!req.workerId)
  64. throw new Error(`key 'workerId' is wrong`);
  65. const state = this.workerState.getState(req.workerId);
  66. this.send((state ? state : {}), req, ws);
  67. }
  68. async workerGetStateFinish(req, ws) {
  69. if (!req.workerId)
  70. throw new Error(`key 'workerId' is wrong`);
  71. const refreshPause = 200;
  72. let i = 0;
  73. let state = {};
  74. while (1) {// eslint-disable-line no-constant-condition
  75. const prevProgress = state.progress || -1;
  76. const prevState = state.state || '';
  77. state = this.workerState.getState(req.workerId);
  78. this.send((state ? state : {}), req, ws);
  79. if (!state) break;
  80. if (state.state != 'finish' && state.state != 'error')
  81. await utils.sleep(refreshPause);
  82. else
  83. break;
  84. i++;
  85. if (i > 2*60*1000/refreshPause) {//2 мин ждем телодвижений воркера
  86. this.send({state: 'error', error: 'Время ожидания процесса истекло'}, req, ws);
  87. }
  88. i = (prevProgress != state.progress || prevState != state.state ? 1 : i);
  89. }
  90. }
  91. }
  92. module.exports = WebSocketController;