Внедрение обработчика очереди входящих сообщений в приложение

This commit is contained in:
Mikhail Chechnev 2018-12-11 19:12:29 +03:00
parent debaca0efa
commit e6545f2e77

View File

@ -10,6 +10,7 @@
const lg = require("./logger"); //Протоколирование работы const lg = require("./logger"); //Протоколирование работы
const db = require("./db_connector"); //Взаимодействие с БД const db = require("./db_connector"); //Взаимодействие с БД
const oq = require("./out_queue"); //Прослушивание очереди исходящих сообщений const oq = require("./out_queue"); //Прослушивание очереди исходящих сообщений
const iq = require("./in_queue"); //Прослушивание очереди входящих сообщений
const sac = require("./service_available_controller"); //Контроль доступности удалённых сервисов const sac = require("./service_available_controller"); //Контроль доступности удалённых сервисов
const { ServerError } = require("./server_errors"); //Типовая ошибка const { ServerError } = require("./server_errors"); //Типовая ошибка
const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции
@ -30,6 +31,8 @@ class ParusAppServer {
this.dbConn = null; this.dbConn = null;
//Обработчик очереди исходящих //Обработчик очереди исходящих
this.outQ = null; this.outQ = null;
//Обработчик очереди входящих
this.inQ = null;
//Контроллер доступности удалённых сервисов //Контроллер доступности удалённых сервисов
this.srvAvlCtrl = null; this.srvAvlCtrl = null;
//Флаг остановки сервера //Флаг остановки сервера
@ -41,6 +44,8 @@ class ParusAppServer {
this.onDBDisconnected = this.onDBDisconnected.bind(this); this.onDBDisconnected = this.onDBDisconnected.bind(this);
this.onOutQStarted = this.onOutQStarted.bind(this); this.onOutQStarted = this.onOutQStarted.bind(this);
this.onOutQStopped = this.onOutQStopped.bind(this); this.onOutQStopped = this.onOutQStopped.bind(this);
this.onInQStarted = this.onInQStarted.bind(this);
this.onInQStopped = this.onInQStopped.bind(this);
this.onServiceACStarted = this.onServiceACStarted.bind(this); this.onServiceACStarted = this.onServiceACStarted.bind(this);
this.onServiceACStopped = this.onServiceACStopped.bind(this); this.onServiceACStopped = this.onServiceACStopped.bind(this);
} }
@ -81,8 +86,30 @@ class ParusAppServer {
async onOutQStarted() { async onOutQStarted() {
//Сообщим, что запустили обработчик //Сообщим, что запустили обработчик
await this.logger.info("Обработчик очереди исходящих сообщений запущен"); await this.logger.info("Обработчик очереди исходящих сообщений запущен");
//Запускаем бслуживание очереди входящих
//Запускаем обслуживание очереди исходящих await this.logger.info("Запуск обработчика очереди входящих сообщений...");
try {
this.inQ.startProcessing({ services: this.services });
} catch (e) {
await this.logger.error(`Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`);
await this.stop();
return;
}
}
//При останове обработчика исходящих сообщений
async onOutQStopped() {
//Сообщим, что остановили обработчик
await this.logger.warn("Обработчик очереди исходящих сообщений остановлен");
//Останавливаем очередь обработки входящих
await this.logger.warn("Останов обработчика очереди входящих сообщений...");
if (this.inQ) this.inQ.stopProcessing();
else await this.onInQStopped();
}
//При запуске обработчика входящих сообщений
async onInQStarted() {
//Сообщим, что запустили обработчик
await this.logger.info("Обработчик очереди входящих сообщений запущен");
//Запускаем контроллер доступности удалённых сервисов
await this.logger.info("Запуск контроллера доступности удалённых сервисов..."); await this.logger.info("Запуск контроллера доступности удалённых сервисов...");
try { try {
this.srvAvlCtrl.startController({ services: this.services }); this.srvAvlCtrl.startController({ services: this.services });
@ -91,13 +118,11 @@ class ParusAppServer {
await this.stop(); await this.stop();
return; return;
} }
//Рапортуем, что запустились
await this.logger.info("Сервер приложений запущен");
} }
//При останове обработчика исходящих сообщений //При останове обработчика входящих сообщений
async onOutQStopped() { async onInQStopped() {
//Сообщим, что остановили обработчик //Сообщим, что остановили обработчик
await this.logger.warn("Обработчик очереди исходящих сообщений остановлен"); await this.logger.warn("Обработчик очереди входящих сообщений остановлен");
//Останавливаем контроллер доступности удалённных сервисов //Останавливаем контроллер доступности удалённных сервисов
await this.logger.warn("Останов контроллера доступности удалённых сервисов..."); await this.logger.warn("Останов контроллера доступности удалённых сервисов...");
if (this.srvAvlCtrl) this.srvAvlCtrl.stopController(); if (this.srvAvlCtrl) this.srvAvlCtrl.stopController();
@ -107,6 +132,8 @@ class ParusAppServer {
async onServiceACStarted() { async onServiceACStarted() {
//Сообщим, что запустили обработчик //Сообщим, что запустили обработчик
await this.logger.info("Контроллер доступности удалённых сервисов запущен"); await this.logger.info("Контроллер доступности удалённых сервисов запущен");
//Рапортуем, что запустились
await this.logger.info("Сервер приложений запущен");
} }
//При останове контроллера доступности удаленных сервисов //При останове контроллера доступности удаленных сервисов
async onServiceACStopped() { async onServiceACStopped() {
@ -141,6 +168,8 @@ class ParusAppServer {
this.dbConn = new db.DBConnector({ connectSettings: prms.config.dbConnect }); this.dbConn = new db.DBConnector({ connectSettings: prms.config.dbConnect });
//Создаём обработчик очереди исходящих //Создаём обработчик очереди исходящих
this.outQ = new oq.OutQueue({ outGoing: prms.config.outGoing, dbConn: this.dbConn, logger: this.logger }); this.outQ = new oq.OutQueue({ outGoing: prms.config.outGoing, dbConn: this.dbConn, logger: this.logger });
//Создаём обработчик очереди входящих
this.inQ = new iq.InQueue({ inComing: prms.config.inComing, dbConn: this.dbConn, logger: this.logger });
//Создаём контроллер доступности удалённых сервислв //Создаём контроллер доступности удалённых сервислв
this.srvAvlCtrl = new sac.ServiceAvailableController({ logger: this.logger, mail: prms.config.mail }); this.srvAvlCtrl = new sac.ServiceAvailableController({ logger: this.logger, mail: prms.config.mail });
//Скажем что инициализировали //Скажем что инициализировали
@ -163,6 +192,9 @@ class ParusAppServer {
//Включим прослушивание событий обработчика исходящих сообщений //Включим прослушивание событий обработчика исходящих сообщений
this.outQ.on(oq.SEVT_OUT_QUEUE_STARTED, this.onOutQStarted); this.outQ.on(oq.SEVT_OUT_QUEUE_STARTED, this.onOutQStarted);
this.outQ.on(oq.SEVT_OUT_QUEUE_STOPPED, this.onOutQStopped); this.outQ.on(oq.SEVT_OUT_QUEUE_STOPPED, this.onOutQStopped);
//Включим прослушивание событий обработчика входящих сообщений
this.inQ.on(iq.SEVT_IN_QUEUE_STARTED, this.onInQStarted);
this.inQ.on(iq.SEVT_IN_QUEUE_STOPPED, this.onInQStopped);
//Включим прослушивание событий контроллера доступности удалённых сервисов //Включим прослушивание событий контроллера доступности удалённых сервисов
this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED, this.onServiceACStarted); this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED, this.onServiceACStarted);
this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED, this.onServiceACStopped); this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED, this.onServiceACStopped);