diff --git a/core/app.js b/core/app.js index 477000d..bc56c5a 100644 --- a/core/app.js +++ b/core/app.js @@ -196,14 +196,28 @@ class ParusAppServer { if (!sCheckResult) { //Создаём подключение к БД this.dbConn = new db.DBConnector({ connectSettings: prms.config.dbConnect }); - //Создаём обработчик очереди исходящих - this.outQ = new oq.OutQueue({ outGoing: prms.config.outGoing, dbConn: this.dbConn, logger: this.logger }); - //Создаём обработчик очереди входящих - this.inQ = new iq.InQueue({ inComing: prms.config.inComing, dbConn: this.dbConn, logger: this.logger }); //Создаём модуль рассылки уведомлений this.notifier = new ntf.Notifier({ logger: this.logger, mail: prms.config.mail }); + //Создаём обработчик очереди исходящих + this.outQ = new oq.OutQueue({ + outGoing: prms.config.outGoing, + dbConn: this.dbConn, + logger: this.logger, + notifier: this.notifier + }); + //Создаём обработчик очереди входящих + this.inQ = new iq.InQueue({ + inComing: prms.config.inComing, + dbConn: this.dbConn, + logger: this.logger, + notifier: this.notifier + }); //Создаём контроллер доступности удалённых сервисов - this.srvAvlCtrl = new sac.ServiceAvailableController({ logger: this.logger, notifier: this.notifier }); + this.srvAvlCtrl = new sac.ServiceAvailableController({ + logger: this.logger, + notifier: this.notifier, + dbConn: this.dbConn + }); //Скажем что инициализировали await this.logger.info("Сервер приложений инициализирован"); } else { diff --git a/core/in_queue.js b/core/in_queue.js index 61d12fd..ac6234c 100644 --- a/core/in_queue.js +++ b/core/in_queue.js @@ -60,6 +60,8 @@ class InQueue extends EventEmitter { this.dbConn = prms.dbConn; //Запомним логгер this.logger = prms.logger; + //Запомним уведомитель + this.notifier = prms.notifier; //WEB-приложение this.webApp = express(); //WEB-сервер @@ -256,27 +258,45 @@ class InQueue extends EventEmitter { //Фиксируем успех обработки - в протоколе работы сервиса await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId }); } catch (e) { + //Тема и текст уведомления об ошибке + let sSubject = `Ошибка обработки входящего сообщения сервером приложений для функции "${ + prms.function.sCode + }" сервиса "${prms.service.sCode}"`; + let sMessage = makeErrorText(e); //Если сообщение очереди успели создать if (q) { //Фиксируем ошибку обработки сервером приложений - в статусе сообщения q = await this.dbConn.setQueueState({ nQueueId: q.nId, - sExecMsg: makeErrorText(e), + sExecMsg: sMessage, nIncExecCnt: NINC_EXEC_CNT_YES, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR }); //Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса await this.logger.error( - `Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${makeErrorText(e)}`, + `Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${sMessage}`, { nQueueId: q.nId } ); + //Добавим чуть больше информации в тему сообщения + sSubject = `Ошибка обработки входящего сообщения ${q.nId} сервером приложений для функции "${ + prms.function.sCode + }" сервиса "${prms.service.sCode}"`; } else { //Ограничимся общей ошибкой - await this.logger.error(makeErrorText(e), { + await this.logger.error(sMessage, { nServiceId: prms.service.nId, nServiceFnId: prms.function.nId }); } + //Если для функции-обработчика указан признак необходимости оповещения об ошибках + if (prms.function.nErrNtfSign == objServiceFnSchema.NERR_NTF_SIGN_YES) { + //Отправим уведомление об ошибке отработки в почту + await this.notifier.addMessage({ + sTo: prms.function.sErrNtfMail, + sSubject, + sMessage + }); + } //Отправим ошибку клиенту prms.res.status(500).send(makeErrorText(e)); } diff --git a/core/out_queue.js b/core/out_queue.js index 165fc1e..0964ac4 100644 --- a/core/out_queue.js +++ b/core/out_queue.js @@ -16,6 +16,7 @@ const { makeErrorText, validateObject } = require("./utils"); //Вспомога const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди +const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса const prmsOutQueueSchema = require("../models/prms_out_queue"); //Схемы валидации параметров функций класса //-------------------------- @@ -60,6 +61,8 @@ class OutQueue extends EventEmitter { this.dbConn = prms.dbConn; //Запомним логгер this.logger = prms.logger; + //Запомним уведомитель + this.notifier = prms.notifier; //Список обрабатываемых в текущий момент сообщений очереди this.inProgress = []; //Привяжем методы к указателю на себя для использования в обработчиках событий @@ -134,7 +137,7 @@ class OutQueue extends EventEmitter { } //Старт обработчика startQueueProcessor(prms) { - //Проверяем структуру переданного объекта + //Проверяем структуру переданного объекта для старта обработчика let sCheckResult = validateObject( prms, prmsOutQueueSchema.startQueueProcessor, @@ -161,7 +164,7 @@ class OutQueue extends EventEmitter { } //Останов обработчика stopQueueProcessor(prms) { - //Проверяем структуру переданного объекта для старта + //Проверяем структуру переданного объекта для останова обработчика let sCheckResult = validateObject( prms, prmsOutQueueSchema.stopQueueProcessor, @@ -179,6 +182,41 @@ class OutQueue extends EventEmitter { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } + //Оповещение об ошибке исполнения сообщения + async notifyMessageProcessError(prms) { + try { + //Проверяем структуру переданного объекта для отправки оповещения + let sCheckResult = validateObject( + prms, + prmsOutQueueSchema.notifyMessageProcessError, + "Параметры функции оповещения об ошибке исполнения сообщения" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Найдем сервис и функцию, исполнявшие данное сообщение + let service = _.find(this.services, { nId: prms.queue.nServiceId }); + let func = _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, { + nId: prms.queue.nServiceFnId + }); + //Если нашли и для функции-обработчика указан признак необходимости оповещения об ошибках + if (service && func && func.nErrNtfSign == objServiceFnSchema.NERR_NTF_SIGN_YES) + //Отправим уведомление об ошибке отработки в почту + await this.notifier.addMessage({ + sTo: func.sErrNtfMail, + sSubject: `Ошибка обработки исходящего сообщения ${ + prms.queue.nId + } сервером приложений для функции "${func.sCode}" сервиса "${service.sCode}"`, + sMessage: prms.queue.sExecMsg + }); + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } catch (e) { + await this.logger.error( + `При отправке уведомления об ошибке обработки исходящего сообщения: ${makeErrorText(e)}` + ); + } + } //Запуск обработки очередного сообщения processMessage(prms) { //Проверяем структуру переданного объекта @@ -234,7 +272,7 @@ class OutQueue extends EventEmitter { //Запись в протокол работы сервиса await self.logger.error(sErrorLog, { nQueueId: prms.queue.nId }); //Запись в статус сообщения - await this.dbConn.setQueueState({ + prms.queue = await this.dbConn.setQueueState({ nQueueId: prms.queue.nId, sExecMsg: sError, nIncExecCnt: nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, @@ -246,6 +284,9 @@ class OutQueue extends EventEmitter { : objQueueSchema.NQUEUE_EXEC_STATE_ERR }); } + //Если исполнение завершилось полностью и с ошибкой - расскажем об этом + if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR) + await this.notifyMessageProcessError(prms); //Останавливаем обработчик и инкрементируем флаг их доступного количества try { this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc }); @@ -266,7 +307,7 @@ class OutQueue extends EventEmitter { nQueueId: prms.queue.nId }); //Фиксируем ошибку обработки - статус сообщения - await this.dbConn.setQueueState({ + prms.queue = await this.dbConn.setQueueState({ nQueueId: prms.queue.nId, sExecMsg: makeErrorText(e), nIncExecCnt: nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, @@ -276,6 +317,9 @@ class OutQueue extends EventEmitter { ? prms.queue.nExecState : objQueueSchema.NQUEUE_EXEC_STATE_ERR }); + //Если исполнение завершилось полностью и с ошибкой - расскажем об этом + if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR) + await this.notifyMessageProcessError(prms); //Останавливаем обработчик и инкрементируем флаг их доступного количества try { this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc }); @@ -323,7 +367,7 @@ class OutQueue extends EventEmitter { this.processMessage({ queue: outMsgs[i] }); } catch (e) { //Фиксируем ошибку обработки сервером приложений - статус сообщения - await this.dbConn.setQueueState({ + let queue = await this.dbConn.setQueueState({ nQueueId: outMsgs[i].nId, sExecMsg: makeErrorText(e), nIncExecCnt: NINC_EXEC_CNT_YES, @@ -334,6 +378,9 @@ class OutQueue extends EventEmitter { }); //Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений await this.logger.error(makeErrorText(e), { nQueueId: outMsgs[i].nId }); + //Если исполнение завершилось полностью и с ошибкой - расскажем об этом + if (queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR) + await this.notifyMessageProcessError({ queue }); } } } diff --git a/models/prms_in_queue.js b/models/prms_in_queue.js index 365f000..e6d72c5 100644 --- a/models/prms_in_queue.js +++ b/models/prms_in_queue.js @@ -15,6 +15,7 @@ const { DBConnector } = require("../core/db_connector"); //Класс взаим const { Logger } = require("../core/logger"); //Класс для протоколирования работы const { Service } = require("./obj_service"); //Схема валидации сервиса const { ServiceFunction } = require("./obj_service_function"); //Схема валидации функции сервиса +const { Notifier } = require("../core/notifier"); //Класс рассылки уведомлений //------------------ // Интерфейс модуля @@ -49,6 +50,16 @@ exports.InQueue = new Schema({ `Объект для протоколирования работы (${path}) имеет некорректный тип данных (ожидалось - Logger)`, required: path => `Не указаны объект для протоколирования работы (${path})` } + }, + //Объект для рассылки уведомлений + notifier: { + type: Notifier, + required: true, + message: { + type: path => + `Объект для рассылки уведомлений (${path}) имеет некорректный тип данных (ожидалось - Notifier)`, + required: path => `Не указан объект для рассылки уведомлений (${path})` + } } }); diff --git a/models/prms_out_queue.js b/models/prms_out_queue.js index 4c9188f..734919c 100644 --- a/models/prms_out_queue.js +++ b/models/prms_out_queue.js @@ -13,6 +13,7 @@ const { defServices } = require("./obj_services"); //Схема валидаци const { Queue } = require("./obj_queue"); //Схема валидации сообщения очереди const { DBConnector } = require("../core/db_connector"); //Класс взаимодействия в БД const { Logger } = require("../core/logger"); //Класс для протоколирования работы +const { Notifier } = require("../core/notifier"); //Класс рассылки уведомлений //------------- // Тело модуля @@ -55,6 +56,16 @@ exports.OutQueue = new Schema({ `Объект для протоколирования работы (${path}) имеет некорректный тип данных (ожидалось - Logger)`, required: path => `Не указаны объект для протоколирования работы (${path})` } + }, + //Объект для рассылки уведомлений + notifier: { + type: Notifier, + required: true, + message: { + type: path => + `Объект для рассылки уведомлений (${path}) имеет некорректный тип данных (ожидалось - Notifier)`, + required: path => `Не указан объект для рассылки уведомлений (${path})` + } } }); @@ -142,6 +153,18 @@ exports.stopQueueProcessor = new Schema({ } }); +//Схема валидации параметров функции оповещения об ошибке исполнения сообщения обмена +exports.notifyMessageProcessError = new Schema({ + //Обрабатываемое исходящее сообщение + queue: { + schema: Queue, + required: true, + message: { + required: path => `Не указано обрабатываемое исходящее сообщение (${path})` + } + } +}); + //Схема валидации параметров функции передачи исходящего сообшения на обработку exports.processMessage = new Schema({ //Обрабатываемое исходящее сообщение