diff --git a/core/app.js b/core/app.js index 69c328f..227bd68 100644 --- a/core/app.js +++ b/core/app.js @@ -10,6 +10,7 @@ const lg = require("./logger"); //Протоколирование работы const db = require("./db_connector"); //Взаимодействие с БД const oq = require("./out_queue"); //Прослушивание очереди исходящих сообщений +const iq = require("./in_queue"); //Прослушивание очереди входящих сообщений const sac = require("./service_available_controller"); //Контроль доступности удалённых сервисов const { ServerError } = require("./server_errors"); //Типовая ошибка const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции @@ -30,6 +31,8 @@ class ParusAppServer { this.dbConn = null; //Обработчик очереди исходящих this.outQ = null; + //Обработчик очереди входящих + this.inQ = null; //Контроллер доступности удалённых сервисов this.srvAvlCtrl = null; //Флаг остановки сервера @@ -41,6 +44,8 @@ class ParusAppServer { this.onDBDisconnected = this.onDBDisconnected.bind(this); this.onOutQStarted = this.onOutQStarted.bind(this); this.onOutQStopped = this.onOutQStopped.bind(this); + this.onInQStarted = this.onInQStarted.bind(this); + this.onInQStopped = this.onInQStopped.bind(this); this.onServiceACStarted = this.onServiceACStarted.bind(this); this.onServiceACStopped = this.onServiceACStopped.bind(this); } @@ -81,8 +86,30 @@ class ParusAppServer { async onOutQStarted() { //Сообщим, что запустили обработчик await this.logger.info("Обработчик очереди исходящих сообщений запущен"); - - //Запускаем обслуживание очереди исходящих + //Запускаем бслуживание очереди входящих + await this.logger.info("Запуск обработчика очереди входящих сообщений..."); + try { + this.inQ.startProcessing({ services: this.services }); + } catch (e) { + await this.logger.error(`Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`); + await this.stop(); + return; + } + } + //При останове обработчика исходящих сообщений + async onOutQStopped() { + //Сообщим, что остановили обработчик + await this.logger.warn("Обработчик очереди исходящих сообщений остановлен"); + //Останавливаем очередь обработки входящих + await this.logger.warn("Останов обработчика очереди входящих сообщений..."); + if (this.inQ) this.inQ.stopProcessing(); + else await this.onInQStopped(); + } + //При запуске обработчика входящих сообщений + async onInQStarted() { + //Сообщим, что запустили обработчик + await this.logger.info("Обработчик очереди входящих сообщений запущен"); + //Запускаем контроллер доступности удалённых сервисов await this.logger.info("Запуск контроллера доступности удалённых сервисов..."); try { this.srvAvlCtrl.startController({ services: this.services }); @@ -91,13 +118,11 @@ class ParusAppServer { await this.stop(); return; } - //Рапортуем, что запустились - await this.logger.info("Сервер приложений запущен"); } - //При останове обработчика исходящих сообщений - async onOutQStopped() { + //При останове обработчика входящих сообщений + async onInQStopped() { //Сообщим, что остановили обработчик - await this.logger.warn("Обработчик очереди исходящих сообщений остановлен"); + await this.logger.warn("Обработчик очереди входящих сообщений остановлен"); //Останавливаем контроллер доступности удалённных сервисов await this.logger.warn("Останов контроллера доступности удалённых сервисов..."); if (this.srvAvlCtrl) this.srvAvlCtrl.stopController(); @@ -107,6 +132,8 @@ class ParusAppServer { async onServiceACStarted() { //Сообщим, что запустили обработчик await this.logger.info("Контроллер доступности удалённых сервисов запущен"); + //Рапортуем, что запустились + await this.logger.info("Сервер приложений запущен"); } //При останове контроллера доступности удаленных сервисов async onServiceACStopped() { @@ -141,6 +168,8 @@ class ParusAppServer { this.dbConn = new db.DBConnector({ connectSettings: prms.config.dbConnect }); //Создаём обработчик очереди исходящих this.outQ = new oq.OutQueue({ outGoing: prms.config.outGoing, dbConn: this.dbConn, logger: this.logger }); + //Создаём обработчик очереди входящих + this.inQ = new iq.InQueue({ inComing: prms.config.inComing, dbConn: this.dbConn, logger: this.logger }); //Создаём контроллер доступности удалённых сервислв this.srvAvlCtrl = new sac.ServiceAvailableController({ logger: this.logger, mail: prms.config.mail }); //Скажем что инициализировали @@ -163,6 +192,9 @@ class ParusAppServer { //Включим прослушивание событий обработчика исходящих сообщений this.outQ.on(oq.SEVT_OUT_QUEUE_STARTED, this.onOutQStarted); this.outQ.on(oq.SEVT_OUT_QUEUE_STOPPED, this.onOutQStopped); + //Включим прослушивание событий обработчика входящих сообщений + this.inQ.on(iq.SEVT_IN_QUEUE_STARTED, this.onInQStarted); + this.inQ.on(iq.SEVT_IN_QUEUE_STOPPED, this.onInQStopped); //Включим прослушивание событий контроллера доступности удалённых сервисов this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED, this.onServiceACStarted); this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED, this.onServiceACStopped);