Рассылка уведомлений об ошибках обработки сообщений обмена

This commit is contained in:
Mikhail Chechnev 2019-01-07 17:49:50 +03:00
parent a522cdf3b6
commit 8428a733c6
5 changed files with 128 additions and 13 deletions

View File

@ -196,14 +196,28 @@ class ParusAppServer {
if (!sCheckResult) { if (!sCheckResult) {
//Создаём подключение к БД //Создаём подключение к БД
this.dbConn = new db.DBConnector({ connectSettings: prms.config.dbConnect }); this.dbConn = new db.DBConnector({ connectSettings: prms.config.dbConnect });
//Создаём обработчик очереди исходящих
this.outQ = new oq.OutQueue({ outGoing: prms.config.outGoing, dbConn: this.dbConn, logger: this.logger });
//Создаём обработчик очереди входящих
this.inQ = new iq.InQueue({ inComing: prms.config.inComing, dbConn: this.dbConn, logger: this.logger });
//Создаём модуль рассылки уведомлений //Создаём модуль рассылки уведомлений
this.notifier = new ntf.Notifier({ logger: this.logger, mail: prms.config.mail }); this.notifier = new ntf.Notifier({ logger: this.logger, mail: prms.config.mail });
//Создаём обработчик очереди исходящих
this.outQ = new oq.OutQueue({
outGoing: prms.config.outGoing,
dbConn: this.dbConn,
logger: this.logger,
notifier: this.notifier
});
//Создаём обработчик очереди входящих
this.inQ = new iq.InQueue({
inComing: prms.config.inComing,
dbConn: this.dbConn,
logger: this.logger,
notifier: this.notifier
});
//Создаём контроллер доступности удалённых сервисов //Создаём контроллер доступности удалённых сервисов
this.srvAvlCtrl = new sac.ServiceAvailableController({ logger: this.logger, notifier: this.notifier }); this.srvAvlCtrl = new sac.ServiceAvailableController({
logger: this.logger,
notifier: this.notifier,
dbConn: this.dbConn
});
//Скажем что инициализировали //Скажем что инициализировали
await this.logger.info("Сервер приложений инициализирован"); await this.logger.info("Сервер приложений инициализирован");
} else { } else {

View File

@ -60,6 +60,8 @@ class InQueue extends EventEmitter {
this.dbConn = prms.dbConn; this.dbConn = prms.dbConn;
//Запомним логгер //Запомним логгер
this.logger = prms.logger; this.logger = prms.logger;
//Запомним уведомитель
this.notifier = prms.notifier;
//WEB-приложение //WEB-приложение
this.webApp = express(); this.webApp = express();
//WEB-сервер //WEB-сервер
@ -256,27 +258,45 @@ class InQueue extends EventEmitter {
//Фиксируем успех обработки - в протоколе работы сервиса //Фиксируем успех обработки - в протоколе работы сервиса
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId }); await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
} catch (e) { } catch (e) {
//Тема и текст уведомления об ошибке
let sSubject = `Ошибка обработки входящего сообщения сервером приложений для функции "${
prms.function.sCode
}" сервиса "${prms.service.sCode}"`;
let sMessage = makeErrorText(e);
//Если сообщение очереди успели создать //Если сообщение очереди успели создать
if (q) { if (q) {
//Фиксируем ошибку обработки сервером приложений - в статусе сообщения //Фиксируем ошибку обработки сервером приложений - в статусе сообщения
q = await this.dbConn.setQueueState({ q = await this.dbConn.setQueueState({
nQueueId: q.nId, nQueueId: q.nId,
sExecMsg: makeErrorText(e), sExecMsg: sMessage,
nIncExecCnt: NINC_EXEC_CNT_YES, nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
}); });
//Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса //Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса
await this.logger.error( await this.logger.error(
`Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${makeErrorText(e)}`, `Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${sMessage}`,
{ nQueueId: q.nId } { nQueueId: q.nId }
); );
//Добавим чуть больше информации в тему сообщения
sSubject = `Ошибка обработки входящего сообщения ${q.nId} сервером приложений для функции "${
prms.function.sCode
}" сервиса "${prms.service.sCode}"`;
} else { } else {
//Ограничимся общей ошибкой //Ограничимся общей ошибкой
await this.logger.error(makeErrorText(e), { await this.logger.error(sMessage, {
nServiceId: prms.service.nId, nServiceId: prms.service.nId,
nServiceFnId: prms.function.nId nServiceFnId: prms.function.nId
}); });
} }
//Если для функции-обработчика указан признак необходимости оповещения об ошибках
if (prms.function.nErrNtfSign == objServiceFnSchema.NERR_NTF_SIGN_YES) {
//Отправим уведомление об ошибке отработки в почту
await this.notifier.addMessage({
sTo: prms.function.sErrNtfMail,
sSubject,
sMessage
});
}
//Отправим ошибку клиенту //Отправим ошибку клиенту
prms.res.status(500).send(makeErrorText(e)); prms.res.status(500).send(makeErrorText(e));
} }

View File

@ -16,6 +16,7 @@ const { makeErrorText, validateObject } = require("./utils"); //Вспомога
const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди
const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди
const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса
const prmsOutQueueSchema = require("../models/prms_out_queue"); //Схемы валидации параметров функций класса const prmsOutQueueSchema = require("../models/prms_out_queue"); //Схемы валидации параметров функций класса
//-------------------------- //--------------------------
@ -60,6 +61,8 @@ class OutQueue extends EventEmitter {
this.dbConn = prms.dbConn; this.dbConn = prms.dbConn;
//Запомним логгер //Запомним логгер
this.logger = prms.logger; this.logger = prms.logger;
//Запомним уведомитель
this.notifier = prms.notifier;
//Список обрабатываемых в текущий момент сообщений очереди //Список обрабатываемых в текущий момент сообщений очереди
this.inProgress = []; this.inProgress = [];
//Привяжем методы к указателю на себя для использования в обработчиках событий //Привяжем методы к указателю на себя для использования в обработчиках событий
@ -134,7 +137,7 @@ class OutQueue extends EventEmitter {
} }
//Старт обработчика //Старт обработчика
startQueueProcessor(prms) { startQueueProcessor(prms) {
//Проверяем структуру переданного объекта //Проверяем структуру переданного объекта для старта обработчика
let sCheckResult = validateObject( let sCheckResult = validateObject(
prms, prms,
prmsOutQueueSchema.startQueueProcessor, prmsOutQueueSchema.startQueueProcessor,
@ -161,7 +164,7 @@ class OutQueue extends EventEmitter {
} }
//Останов обработчика //Останов обработчика
stopQueueProcessor(prms) { stopQueueProcessor(prms) {
//Проверяем структуру переданного объекта для старта //Проверяем структуру переданного объекта для останова обработчика
let sCheckResult = validateObject( let sCheckResult = validateObject(
prms, prms,
prmsOutQueueSchema.stopQueueProcessor, prmsOutQueueSchema.stopQueueProcessor,
@ -179,6 +182,41 @@ class OutQueue extends EventEmitter {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
} }
} }
//Оповещение об ошибке исполнения сообщения
async notifyMessageProcessError(prms) {
try {
//Проверяем структуру переданного объекта для отправки оповещения
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.notifyMessageProcessError,
"Параметры функции оповещения об ошибке исполнения сообщения"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Найдем сервис и функцию, исполнявшие данное сообщение
let service = _.find(this.services, { nId: prms.queue.nServiceId });
let func = _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, {
nId: prms.queue.nServiceFnId
});
//Если нашли и для функции-обработчика указан признак необходимости оповещения об ошибках
if (service && func && func.nErrNtfSign == objServiceFnSchema.NERR_NTF_SIGN_YES)
//Отправим уведомление об ошибке отработки в почту
await this.notifier.addMessage({
sTo: func.sErrNtfMail,
sSubject: `Ошибка обработки исходящего сообщения ${
prms.queue.nId
} сервером приложений для функции "${func.sCode}" сервиса "${service.sCode}"`,
sMessage: prms.queue.sExecMsg
});
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
} catch (e) {
await this.logger.error(
`При отправке уведомления об ошибке обработки исходящего сообщения: ${makeErrorText(e)}`
);
}
}
//Запуск обработки очередного сообщения //Запуск обработки очередного сообщения
processMessage(prms) { processMessage(prms) {
//Проверяем структуру переданного объекта //Проверяем структуру переданного объекта
@ -234,7 +272,7 @@ class OutQueue extends EventEmitter {
//Запись в протокол работы сервиса //Запись в протокол работы сервиса
await self.logger.error(sErrorLog, { nQueueId: prms.queue.nId }); await self.logger.error(sErrorLog, { nQueueId: prms.queue.nId });
//Запись в статус сообщения //Запись в статус сообщения
await this.dbConn.setQueueState({ prms.queue = await this.dbConn.setQueueState({
nQueueId: prms.queue.nId, nQueueId: prms.queue.nId,
sExecMsg: sError, sExecMsg: sError,
nIncExecCnt: nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, nIncExecCnt: nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
@ -246,6 +284,9 @@ class OutQueue extends EventEmitter {
: objQueueSchema.NQUEUE_EXEC_STATE_ERR : objQueueSchema.NQUEUE_EXEC_STATE_ERR
}); });
} }
//Если исполнение завершилось полностью и с ошибкой - расскажем об этом
if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR)
await this.notifyMessageProcessError(prms);
//Останавливаем обработчик и инкрементируем флаг их доступного количества //Останавливаем обработчик и инкрементируем флаг их доступного количества
try { try {
this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc }); this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc });
@ -266,7 +307,7 @@ class OutQueue extends EventEmitter {
nQueueId: prms.queue.nId nQueueId: prms.queue.nId
}); });
//Фиксируем ошибку обработки - статус сообщения //Фиксируем ошибку обработки - статус сообщения
await this.dbConn.setQueueState({ prms.queue = await this.dbConn.setQueueState({
nQueueId: prms.queue.nId, nQueueId: prms.queue.nId,
sExecMsg: makeErrorText(e), sExecMsg: makeErrorText(e),
nIncExecCnt: nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, nIncExecCnt: nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
@ -276,6 +317,9 @@ class OutQueue extends EventEmitter {
? prms.queue.nExecState ? prms.queue.nExecState
: objQueueSchema.NQUEUE_EXEC_STATE_ERR : objQueueSchema.NQUEUE_EXEC_STATE_ERR
}); });
//Если исполнение завершилось полностью и с ошибкой - расскажем об этом
if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR)
await this.notifyMessageProcessError(prms);
//Останавливаем обработчик и инкрементируем флаг их доступного количества //Останавливаем обработчик и инкрементируем флаг их доступного количества
try { try {
this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc }); this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc });
@ -323,7 +367,7 @@ class OutQueue extends EventEmitter {
this.processMessage({ queue: outMsgs[i] }); this.processMessage({ queue: outMsgs[i] });
} catch (e) { } catch (e) {
//Фиксируем ошибку обработки сервером приложений - статус сообщения //Фиксируем ошибку обработки сервером приложений - статус сообщения
await this.dbConn.setQueueState({ let queue = await this.dbConn.setQueueState({
nQueueId: outMsgs[i].nId, nQueueId: outMsgs[i].nId,
sExecMsg: makeErrorText(e), sExecMsg: makeErrorText(e),
nIncExecCnt: NINC_EXEC_CNT_YES, nIncExecCnt: NINC_EXEC_CNT_YES,
@ -334,6 +378,9 @@ class OutQueue extends EventEmitter {
}); });
//Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений //Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений
await this.logger.error(makeErrorText(e), { nQueueId: outMsgs[i].nId }); await this.logger.error(makeErrorText(e), { nQueueId: outMsgs[i].nId });
//Если исполнение завершилось полностью и с ошибкой - расскажем об этом
if (queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR)
await this.notifyMessageProcessError({ queue });
} }
} }
} }

View File

@ -15,6 +15,7 @@ const { DBConnector } = require("../core/db_connector"); //Класс взаим
const { Logger } = require("../core/logger"); //Класс для протоколирования работы const { Logger } = require("../core/logger"); //Класс для протоколирования работы
const { Service } = require("./obj_service"); //Схема валидации сервиса const { Service } = require("./obj_service"); //Схема валидации сервиса
const { ServiceFunction } = require("./obj_service_function"); //Схема валидации функции сервиса const { ServiceFunction } = require("./obj_service_function"); //Схема валидации функции сервиса
const { Notifier } = require("../core/notifier"); //Класс рассылки уведомлений
//------------------ //------------------
// Интерфейс модуля // Интерфейс модуля
@ -49,6 +50,16 @@ exports.InQueue = new Schema({
`Объект для протоколирования работы (${path}) имеет некорректный тип данных (ожидалось - Logger)`, `Объект для протоколирования работы (${path}) имеет некорректный тип данных (ожидалось - Logger)`,
required: path => `Не указаны объект для протоколирования работы (${path})` required: path => `Не указаны объект для протоколирования работы (${path})`
} }
},
//Объект для рассылки уведомлений
notifier: {
type: Notifier,
required: true,
message: {
type: path =>
`Объект для рассылки уведомлений (${path}) имеет некорректный тип данных (ожидалось - Notifier)`,
required: path => `Не указан объект для рассылки уведомлений (${path})`
}
} }
}); });

View File

@ -13,6 +13,7 @@ const { defServices } = require("./obj_services"); //Схема валидаци
const { Queue } = require("./obj_queue"); //Схема валидации сообщения очереди const { Queue } = require("./obj_queue"); //Схема валидации сообщения очереди
const { DBConnector } = require("../core/db_connector"); //Класс взаимодействия в БД const { DBConnector } = require("../core/db_connector"); //Класс взаимодействия в БД
const { Logger } = require("../core/logger"); //Класс для протоколирования работы const { Logger } = require("../core/logger"); //Класс для протоколирования работы
const { Notifier } = require("../core/notifier"); //Класс рассылки уведомлений
//------------- //-------------
// Тело модуля // Тело модуля
@ -55,6 +56,16 @@ exports.OutQueue = new Schema({
`Объект для протоколирования работы (${path}) имеет некорректный тип данных (ожидалось - Logger)`, `Объект для протоколирования работы (${path}) имеет некорректный тип данных (ожидалось - Logger)`,
required: path => `Не указаны объект для протоколирования работы (${path})` required: path => `Не указаны объект для протоколирования работы (${path})`
} }
},
//Объект для рассылки уведомлений
notifier: {
type: Notifier,
required: true,
message: {
type: path =>
`Объект для рассылки уведомлений (${path}) имеет некорректный тип данных (ожидалось - Notifier)`,
required: path => `Не указан объект для рассылки уведомлений (${path})`
}
} }
}); });
@ -142,6 +153,18 @@ exports.stopQueueProcessor = new Schema({
} }
}); });
//Схема валидации параметров функции оповещения об ошибке исполнения сообщения обмена
exports.notifyMessageProcessError = new Schema({
//Обрабатываемое исходящее сообщение
queue: {
schema: Queue,
required: true,
message: {
required: path => `Не указано обрабатываемое исходящее сообщение (${path})`
}
}
});
//Схема валидации параметров функции передачи исходящего сообшения на обработку //Схема валидации параметров функции передачи исходящего сообшения на обработку
exports.processMessage = new Schema({ exports.processMessage = new Schema({
//Обрабатываемое исходящее сообщение //Обрабатываемое исходящее сообщение