WebSocketController.js 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. const WebSocket = require ('ws');
  2. const _ = require('lodash');
  3. const WorkerState = require('../core/WorkerState');//singleton
  4. const utils = require('../core/utils');
  5. const cleanPeriod = 1*60*1000;//1 минута
  6. const closeSocketOnIdle = 5*60*1000;//5 минут
  7. class WebSocketController {
  8. constructor(wss, config) {
  9. this.config = config;
  10. this.workerState = new WorkerState();
  11. this.wss = wss;
  12. wss.on('connection', (ws) => {
  13. ws.on('message', (message) => {
  14. this.onMessage(ws, message);
  15. });
  16. });
  17. setTimeout(() => { this.periodicClean(); }, cleanPeriod);
  18. }
  19. periodicClean() {
  20. try {
  21. const now = Date.now();
  22. this.wss.clients.forEach((ws) => {
  23. if (!ws.lastActivity || now - ws.lastActivity > closeSocketOnIdle - 50) {
  24. ws.terminate();
  25. }
  26. });
  27. } finally {
  28. setTimeout(() => { this.periodicClean(); }, cleanPeriod);
  29. }
  30. }
  31. async onMessage(ws, message) {
  32. let req = {};
  33. try {
  34. ws.lastActivity = Date.now();
  35. req = JSON.parse(message);
  36. switch (req.action) {
  37. case 'test':
  38. this.test(req, ws); break;
  39. case 'get-config':
  40. this.getConfig(req, ws); break;
  41. case 'worker-get-state':
  42. this.workerGetState(req, ws); break;
  43. case 'worker-get-state-finish':
  44. this.workerGetStateFinish(req, ws); break;
  45. default:
  46. throw new Error(`Action not found: ${req.action}`);
  47. }
  48. } catch (e) {
  49. this.send({error: e.message}, req, ws);
  50. }
  51. }
  52. send(res, req, ws) {
  53. if (ws.readyState == WebSocket.OPEN) {
  54. ws.lastActivity = Date.now();
  55. let r = Object.assign({}, res);
  56. if (req.requestId)
  57. r.requestId = req.requestId;
  58. ws.send(JSON.stringify(r));
  59. }
  60. }
  61. //Actions ------------------------------------------------------------------
  62. async test(req, ws) {
  63. this.send({message: 'Liberama project is awesome'}, req, ws);
  64. }
  65. async getConfig(req, ws) {
  66. if (Array.isArray(req.params)) {
  67. this.send(_.pick(this.config, req.params), req, ws);
  68. } else {
  69. throw new Error('params is not an array');
  70. }
  71. }
  72. async workerGetState(req, ws) {
  73. if (!req.workerId)
  74. throw new Error(`key 'workerId' is wrong`);
  75. const state = this.workerState.getState(req.workerId);
  76. this.send((state ? state : {}), req, ws);
  77. }
  78. async workerGetStateFinish(req, ws) {
  79. if (!req.workerId)
  80. throw new Error(`key 'workerId' is wrong`);
  81. const refreshPause = 200;
  82. let i = 0;
  83. let state = {};
  84. while (1) {// eslint-disable-line no-constant-condition
  85. const prevProgress = state.progress || -1;
  86. const prevState = state.state || '';
  87. state = this.workerState.getState(req.workerId);
  88. this.send((state ? state : {}), req, ws);
  89. if (!state) break;
  90. if (state.state != 'finish' && state.state != 'error')
  91. await utils.sleep(refreshPause);
  92. else
  93. break;
  94. i++;
  95. if (i > 2*60*1000/refreshPause) {//2 мин ждем телодвижений воркера
  96. this.send({state: 'error', error: 'Время ожидания процесса истекло'}, req, ws);
  97. }
  98. i = (prevProgress != state.progress || prevState != state.state ? 1 : i);
  99. }
  100. }
  101. }
  102. module.exports = WebSocketController;