From e6545f2e77a97882ec7fd291deec7633587dde31 Mon Sep 17 00:00:00 2001 From: Mikhail Chechnev Date: Tue, 11 Dec 2018 19:12:29 +0300 Subject: [PATCH] =?UTF-8?q?=D0=92=D0=BD=D0=B5=D0=B4=D1=80=D0=B5=D0=BD?= =?UTF-8?q?=D0=B8=D0=B5=20=D0=BE=D0=B1=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D1=87?= =?UTF-8?q?=D0=B8=D0=BA=D0=B0=20=D0=BE=D1=87=D0=B5=D1=80=D0=B5=D0=B4=D0=B8?= =?UTF-8?q?=20=D0=B2=D1=85=D0=BE=D0=B4=D1=8F=D1=89=D0=B8=D1=85=20=D1=81?= =?UTF-8?q?=D0=BE=D0=BE=D0=B1=D1=89=D0=B5=D0=BD=D0=B8=D0=B9=20=D0=B2=20?= =?UTF-8?q?=D0=BF=D1=80=D0=B8=D0=BB=D0=BE=D0=B6=D0=B5=D0=BD=D0=B8=D0=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/app.js | 46 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/core/app.js b/core/app.js index 69c328f..227bd68 100644 --- a/core/app.js +++ b/core/app.js @@ -10,6 +10,7 @@ const lg = require("./logger"); //Протоколирование работы const db = require("./db_connector"); //Взаимодействие с БД const oq = require("./out_queue"); //Прослушивание очереди исходящих сообщений +const iq = require("./in_queue"); //Прослушивание очереди входящих сообщений const sac = require("./service_available_controller"); //Контроль доступности удалённых сервисов const { ServerError } = require("./server_errors"); //Типовая ошибка const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции @@ -30,6 +31,8 @@ class ParusAppServer { this.dbConn = null; //Обработчик очереди исходящих this.outQ = null; + //Обработчик очереди входящих + this.inQ = null; //Контроллер доступности удалённых сервисов this.srvAvlCtrl = null; //Флаг остановки сервера @@ -41,6 +44,8 @@ class ParusAppServer { this.onDBDisconnected = this.onDBDisconnected.bind(this); this.onOutQStarted = this.onOutQStarted.bind(this); this.onOutQStopped = this.onOutQStopped.bind(this); + this.onInQStarted = this.onInQStarted.bind(this); + this.onInQStopped = this.onInQStopped.bind(this); this.onServiceACStarted = this.onServiceACStarted.bind(this); this.onServiceACStopped = this.onServiceACStopped.bind(this); } @@ -81,8 +86,30 @@ class ParusAppServer { async onOutQStarted() { //Сообщим, что запустили обработчик await this.logger.info("Обработчик очереди исходящих сообщений запущен"); - - //Запускаем обслуживание очереди исходящих + //Запускаем бслуживание очереди входящих + await this.logger.info("Запуск обработчика очереди входящих сообщений..."); + try { + this.inQ.startProcessing({ services: this.services }); + } catch (e) { + await this.logger.error(`Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`); + await this.stop(); + return; + } + } + //При останове обработчика исходящих сообщений + async onOutQStopped() { + //Сообщим, что остановили обработчик + await this.logger.warn("Обработчик очереди исходящих сообщений остановлен"); + //Останавливаем очередь обработки входящих + await this.logger.warn("Останов обработчика очереди входящих сообщений..."); + if (this.inQ) this.inQ.stopProcessing(); + else await this.onInQStopped(); + } + //При запуске обработчика входящих сообщений + async onInQStarted() { + //Сообщим, что запустили обработчик + await this.logger.info("Обработчик очереди входящих сообщений запущен"); + //Запускаем контроллер доступности удалённых сервисов await this.logger.info("Запуск контроллера доступности удалённых сервисов..."); try { this.srvAvlCtrl.startController({ services: this.services }); @@ -91,13 +118,11 @@ class ParusAppServer { await this.stop(); return; } - //Рапортуем, что запустились - await this.logger.info("Сервер приложений запущен"); } - //При останове обработчика исходящих сообщений - async onOutQStopped() { + //При останове обработчика входящих сообщений + async onInQStopped() { //Сообщим, что остановили обработчик - await this.logger.warn("Обработчик очереди исходящих сообщений остановлен"); + await this.logger.warn("Обработчик очереди входящих сообщений остановлен"); //Останавливаем контроллер доступности удалённных сервисов await this.logger.warn("Останов контроллера доступности удалённых сервисов..."); if (this.srvAvlCtrl) this.srvAvlCtrl.stopController(); @@ -107,6 +132,8 @@ class ParusAppServer { async onServiceACStarted() { //Сообщим, что запустили обработчик await this.logger.info("Контроллер доступности удалённых сервисов запущен"); + //Рапортуем, что запустились + await this.logger.info("Сервер приложений запущен"); } //При останове контроллера доступности удаленных сервисов async onServiceACStopped() { @@ -141,6 +168,8 @@ class ParusAppServer { this.dbConn = new db.DBConnector({ connectSettings: prms.config.dbConnect }); //Создаём обработчик очереди исходящих this.outQ = new oq.OutQueue({ outGoing: prms.config.outGoing, dbConn: this.dbConn, logger: this.logger }); + //Создаём обработчик очереди входящих + this.inQ = new iq.InQueue({ inComing: prms.config.inComing, dbConn: this.dbConn, logger: this.logger }); //Создаём контроллер доступности удалённых сервислв this.srvAvlCtrl = new sac.ServiceAvailableController({ logger: this.logger, mail: prms.config.mail }); //Скажем что инициализировали @@ -163,6 +192,9 @@ class ParusAppServer { //Включим прослушивание событий обработчика исходящих сообщений this.outQ.on(oq.SEVT_OUT_QUEUE_STARTED, this.onOutQStarted); this.outQ.on(oq.SEVT_OUT_QUEUE_STOPPED, this.onOutQStopped); + //Включим прослушивание событий обработчика входящих сообщений + this.inQ.on(iq.SEVT_IN_QUEUE_STARTED, this.onInQStarted); + this.inQ.on(iq.SEVT_IN_QUEUE_STOPPED, this.onInQStopped); //Включим прослушивание событий контроллера доступности удалённых сервисов this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED, this.onServiceACStarted); this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED, this.onServiceACStopped);