449 lines
28 KiB
JavaScript
449 lines
28 KiB
JavaScript
/*
|
||
Сервис интеграции ПП Парус 8 с WEB API
|
||
Модуль ядра: отработка очереди исходящих сообщений
|
||
*/
|
||
|
||
//------------------------------
|
||
// Подключение внешних библиотек
|
||
//------------------------------
|
||
|
||
const _ = require("lodash"); //Работа с массивами и коллекциями
|
||
const EventEmitter = require("events"); //Обработчик пользовательских событий
|
||
const ChildProcess = require("child_process"); //Работа с дочерними процессами
|
||
const { ServerError } = require("./server_errors"); //Типовая ошибка
|
||
const { SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы
|
||
const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции
|
||
const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
|
||
const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди
|
||
const { NFORCE_YES } = require("../models/common"); //Общие константы и схемы валидации
|
||
const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди
|
||
const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса
|
||
const prmsOutQueueSchema = require("../models/prms_out_queue"); //Схемы валидации параметров функций класса
|
||
|
||
//--------------------------
|
||
// Глобальные идентификаторы
|
||
//--------------------------
|
||
|
||
//Типовые события
|
||
const SEVT_OUT_QUEUE_STARTED = "OUT_QUEUE_STARTED"; //Обработчик очереди запущен
|
||
const SEVT_OUT_QUEUE_STOPPED = "OUT_QUEUE_STOPPED"; //Обработчик очереди остановлен
|
||
|
||
//Время отложенного старта опроса очереди (мс)
|
||
const NDETECTING_LOOP_DELAY = 3000;
|
||
|
||
//Интервал проверки завершения обработчиков (мс)
|
||
const NWORKERS_WAIT_INTERVAL = 1000;
|
||
|
||
//------------
|
||
// Тело модуля
|
||
//------------
|
||
|
||
//Класс очереди исходящих сообщений
|
||
class OutQueue extends EventEmitter {
|
||
//Конструктор класса
|
||
constructor(prms) {
|
||
//Создадим экземпляр родительского класса
|
||
super();
|
||
//Проверяем структуру переданного объекта для подключения
|
||
let sCheckResult = validateObject(prms, prmsOutQueueSchema.OutQueue, "Параметры конструктора класса OutQueue");
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
//Список обслуживаемых сервисов
|
||
this.services = null;
|
||
//Признак функционирования обработчика
|
||
this.bWorking = false;
|
||
//Параметры очереди
|
||
this.outGoing = _.cloneDeep(prms.outGoing);
|
||
//Количество доступных обработчиков
|
||
this.nWorkersLeft = this.outGoing.nMaxWorkers;
|
||
//Идентификатор таймера проверки очереди
|
||
this.nDetectingLoopTimeOut = null;
|
||
//Запомним подключение к БД
|
||
this.dbConn = prms.dbConn;
|
||
//Запомним логгер
|
||
this.logger = prms.logger;
|
||
//Запомним уведомитель
|
||
this.notifier = prms.notifier;
|
||
//Запомним глобальный адрес прокси-сервера
|
||
this.sProxy = prms.sProxy;
|
||
//Список обрабатываемых в текущий момент сообщений очереди
|
||
this.inProgress = [];
|
||
//Привяжем методы к указателю на себя для использования в обработчиках событий
|
||
this.outDetectingLoop = this.outDetectingLoop.bind(this);
|
||
//Параметры подключения к Kafka
|
||
this.kafka = prms.kafka;
|
||
//Параметры подключения к MQTT
|
||
this.mqtt = prms.mqtt;
|
||
} else {
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
}
|
||
//Уведомление о запуске обработчика очереди
|
||
notifyStarted() {
|
||
//Оповестим подписчиков о запуске
|
||
this.emit(SEVT_OUT_QUEUE_STARTED);
|
||
}
|
||
//Уведомление об остановке обработчика очереди
|
||
notifyStopped() {
|
||
//Оповестим подписчиков об останове
|
||
this.emit(SEVT_OUT_QUEUE_STOPPED);
|
||
}
|
||
//Добавление идентификатора позиции очереди в список обрабатываемых
|
||
addInProgress(prms) {
|
||
//Проверяем структуру переданного объекта
|
||
let sCheckResult = validateObject(
|
||
prms,
|
||
prmsOutQueueSchema.addInProgress,
|
||
"Параметры функции добавления идентификатора позиции очереди в список обрабатываемых"
|
||
);
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
//Проверим, что такого идентификатора ещё нет в списке обрабатываемых
|
||
const i = this.inProgress.indexOf(prms.nQueueId);
|
||
//Если нет - добавим
|
||
if (i === -1) this.inProgress.push(prms.nQueueId);
|
||
} else {
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
}
|
||
//Удаление идентификатора позиции очереди из списка обрабатываемых
|
||
rmInProgress(prms) {
|
||
//Проверяем структуру переданного объекта
|
||
let sCheckResult = validateObject(
|
||
prms,
|
||
prmsOutQueueSchema.rmInProgress,
|
||
"Параметры функции удаления идентификатора позиции очереди из списка обрабатываемых"
|
||
);
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
//Если удаляемый идентификатор есть в списке
|
||
const i = this.inProgress.indexOf(prms.nQueueId);
|
||
//Удалим его
|
||
if (i > -1) {
|
||
this.inProgress.splice(i, 1);
|
||
}
|
||
} else {
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
}
|
||
//Проверка наличия идентификатора позиции очереди в списке обрабатываемых
|
||
isInProgress(prms) {
|
||
//Проверяем структуру переданного объекта
|
||
let sCheckResult = validateObject(
|
||
prms,
|
||
prmsOutQueueSchema.isInProgress,
|
||
"Параметры функции проверки наличия идентификатора позиции очереди в списке обрабатываемых"
|
||
);
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
//Проверим наличие идентификатора в списке
|
||
return !(this.inProgress.indexOf(prms.nQueueId) === -1);
|
||
} else {
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
}
|
||
//Старт обработчика
|
||
startQueueProcessor(prms) {
|
||
//Проверяем структуру переданного объекта для старта обработчика
|
||
let sCheckResult = validateObject(prms, prmsOutQueueSchema.startQueueProcessor, "Параметры функции запуска обработчика сообщения очереди");
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
//Добавляем идентификатор позиции очереди в список обрабатываемых
|
||
this.addInProgress({ nQueueId: prms.queue.nId });
|
||
//Отдаём команду дочернему процессу обработчика на старт исполнения
|
||
prms.proc.send({
|
||
nQueueId: prms.queue.nId,
|
||
connectSettings: {
|
||
...this.dbConn.connectSettings,
|
||
nPoolMin: this.outGoing.nPoolMin,
|
||
nPoolMax: this.outGoing.nPoolMax,
|
||
nPoolIncrement: this.outGoing.nPoolIncrement
|
||
},
|
||
service: _.find(this.services, { nId: prms.queue.nServiceId }),
|
||
function: _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, {
|
||
nId: prms.queue.nServiceFnId
|
||
}),
|
||
sProxy: this.sProxy,
|
||
kafka: this.kafka,
|
||
mqtt: this.mqtt
|
||
});
|
||
//Уменьшаем количество доступных обработчиков
|
||
this.nWorkersLeft--;
|
||
} else {
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
}
|
||
//Останов обработчика
|
||
stopQueueProcessor(prms) {
|
||
//Проверяем структуру переданного объекта для останова обработчика
|
||
let sCheckResult = validateObject(prms, prmsOutQueueSchema.stopQueueProcessor, "Параметры функции останова обработчика сообщения очереди");
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
//Удаляем идентификатор позиции очереди из списка обрабатываемых
|
||
this.rmInProgress({ nQueueId: prms.nQueueId });
|
||
//Завершаем дочерний процесс обработчика
|
||
prms.proc.kill();
|
||
//Увеличиваем количество доступных обработчиков
|
||
this.nWorkersLeft++;
|
||
} else {
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
}
|
||
//Оповещение об ошибке исполнения сообщения
|
||
async notifyMessageProcessError(prms) {
|
||
try {
|
||
//Проверяем структуру переданного объекта для отправки оповещения
|
||
let sCheckResult = validateObject(
|
||
prms,
|
||
prmsOutQueueSchema.notifyMessageProcessError,
|
||
"Параметры функции оповещения об ошибке исполнения сообщения"
|
||
);
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
//Найдем сервис и функцию, исполнявшие данное сообщение
|
||
let service = _.find(this.services, { nId: prms.queue.nServiceId });
|
||
let func = _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, {
|
||
nId: prms.queue.nServiceFnId
|
||
});
|
||
//Если нашли и для функции-обработчика указан признак необходимости оповещения об ошибках
|
||
if (service && func && func.nErrNtfSign == objServiceFnSchema.NERR_NTF_SIGN_YES)
|
||
//Отправим уведомление об ошибке отработки в почту
|
||
await this.notifier.addMessage({
|
||
sTo: func.sErrNtfMail,
|
||
sSubject: `Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений для функции "${func.sCode}" сервиса "${service.sCode}"`,
|
||
sMessage: prms.queue.sExecMsg
|
||
});
|
||
} else {
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
} catch (e) {
|
||
await this.logger.error(`При отправке уведомления об ошибке обработки исходящего сообщения: ${makeErrorText(e)}`);
|
||
}
|
||
}
|
||
//Запуск обработки очередного сообщения
|
||
processMessage(prms) {
|
||
//Проверяем структуру переданного объекта
|
||
let sCheckResult = validateObject(prms, prmsOutQueueSchema.processMessage, "Параметры функции запуска обработки очередного сообщения");
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
//Проверим, что есть доступные обработчики
|
||
if (this.nWorkersLeft > 0) {
|
||
//Переопределим себя для обращения внутри обработчиков событий
|
||
const self = this;
|
||
//Запоминаем текущее количество попыток обработки
|
||
const nQueueOldExecCnt = prms.queue.nExecCnt;
|
||
//Буфер для ошибок (для журнала работы и очереди обмена)
|
||
let sErrorLog = null;
|
||
let sError = null;
|
||
//Создаём новый обработчик сообщений
|
||
const proc = ChildProcess.fork("core/out_queue_processor", { silent: false });
|
||
//Перехват сообщений обработчика
|
||
proc.on("message", async result => {
|
||
//Перечитывание не требуется, если выполнено успешно
|
||
if (result.sResult !== objOutQueueProcessorSchema.STASK_RESULT_OK) {
|
||
//Перечитываем запись очереди с учетом изменения статуса
|
||
prms.queue = await self.dbConn.getQueue({ nQueueId: prms.queue.nId });
|
||
}
|
||
//Проверяем структуру полученного сообщения
|
||
let sCheckResult = validateObject(
|
||
result,
|
||
objOutQueueProcessorSchema.OutQueueProcessorTaskResult,
|
||
"Ответ обработчика очереди исходящих сообщений"
|
||
);
|
||
//Если структура сообщения в норме
|
||
if (!sCheckResult) {
|
||
//Анализируем результат обработки - если ошибка - фиксируем
|
||
if (result.sResult == objOutQueueProcessorSchema.STASK_RESULT_ERR) {
|
||
//Запоминаем ошибку обработчика
|
||
sErrorLog = `Ошибка обработки исходящего сообщения: ${result.sMsg}`;
|
||
sError = result.sMsg;
|
||
} else {
|
||
//Ошибки обработки нет, но может быть есть ошибка аутентификации
|
||
if (result.sResult == objOutQueueProcessorSchema.STASK_RESULT_UNAUTH) {
|
||
//Ставим задачу на аутентификацию сервиса
|
||
try {
|
||
await this.dbConn.putServiceAuthInQueue({
|
||
nServiceId: prms.queue.nServiceId,
|
||
nForce: NFORCE_YES
|
||
});
|
||
} catch (e) {
|
||
//Отразим в протоколе ошибку постановки задачи на аутентификацию сервиса
|
||
await self.logger.error(`Ошибка постановки задачи на аутентификацию сервиса: ${makeErrorText(e)}`, {
|
||
nQueueId: prms.queue.nId
|
||
});
|
||
}
|
||
}
|
||
}
|
||
} else {
|
||
//Пришел неожиданный ответ обработчика
|
||
sErrorLog = `Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`;
|
||
sError = sCheckResult;
|
||
}
|
||
//Фиксируем ошибки, если есть
|
||
if (sError) {
|
||
//Запись в протокол работы сервиса
|
||
await self.logger.error(sErrorLog, { nQueueId: prms.queue.nId });
|
||
//Запись в статус сообщения
|
||
prms.queue = await this.dbConn.setQueueState({
|
||
nQueueId: prms.queue.nId,
|
||
sExecMsg: sError,
|
||
nIncExecCnt: nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
|
||
nExecState:
|
||
(nQueueOldExecCnt == prms.queue.nExecCnt ? prms.queue.nExecCnt + 1 : prms.queue.nExecCnt) < prms.queue.nRetryAttempts
|
||
? prms.queue.nExecState
|
||
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
|
||
});
|
||
}
|
||
//Если исполнение завершилось полностью и с ошибкой - расскажем об этом
|
||
if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR) await this.notifyMessageProcessError(prms);
|
||
//Останавливаем обработчик и инкрементируем флаг их доступного количества
|
||
try {
|
||
this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc });
|
||
} catch (e) {
|
||
//Отразим в протоколе ошибку останова
|
||
await self.logger.error(`Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`, {
|
||
nQueueId: prms.queue.nId
|
||
});
|
||
}
|
||
});
|
||
//Перехват ошибок обработчика
|
||
proc.on("error", async e => {
|
||
//Считываем сообщение изменённое обработчиком
|
||
prms.queue = await self.dbConn.getQueue({ nQueueId: prms.queue.nId });
|
||
//Фиксируем ошибку в протоколе работы
|
||
await self.logger.error(`Ошибка обработки исходящего сообщения: ${makeErrorText(e)}`, {
|
||
nQueueId: prms.queue.nId
|
||
});
|
||
//Фиксируем ошибку обработки - статус сообщения
|
||
prms.queue = await this.dbConn.setQueueState({
|
||
nQueueId: prms.queue.nId,
|
||
sExecMsg: makeErrorText(e),
|
||
nIncExecCnt: nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
|
||
nExecState:
|
||
(nQueueOldExecCnt == prms.queue.nExecCnt ? prms.queue.nExecCnt + 1 : prms.queue.nExecCnt) < prms.queue.nRetryAttempts
|
||
? prms.queue.nExecState
|
||
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
|
||
});
|
||
//Если исполнение завершилось полностью и с ошибкой - расскажем об этом
|
||
if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR) await this.notifyMessageProcessError(prms);
|
||
//Останавливаем обработчик и инкрементируем флаг их доступного количества
|
||
try {
|
||
this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc });
|
||
} catch (e) {
|
||
//Отразим в протоколе ошибку останова
|
||
await self.logger.error(`Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`, {
|
||
nQueueId: prms.queue.nId
|
||
});
|
||
}
|
||
});
|
||
//Перехват останова обработчика
|
||
proc.on("exit", code => {});
|
||
//Запускаем обработчик
|
||
this.startQueueProcessor({ queue: prms.queue, proc });
|
||
}
|
||
} else {
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
}
|
||
//Перезапуск опроса очереди исходящих сообщений
|
||
async restartDetectingLoop() {
|
||
//Включаем опрос очереди только если установлен флаг работы
|
||
if (this.bWorking) {
|
||
this.nDetectingLoopTimeOut = setTimeout(async () => {
|
||
await this.outDetectingLoop();
|
||
}, this.outGoing.nCheckTimeout);
|
||
}
|
||
}
|
||
//Опрос очереди исходящих сообщений
|
||
async outDetectingLoop() {
|
||
//Если есть свободные обработчики
|
||
if (this.nWorkersLeft > 0) {
|
||
//Сходим на сервер за очередным исходящим сообщением
|
||
try {
|
||
//Заберем столько сообщений, сколько можем обработать одновременно
|
||
let outMsgs = await this.dbConn.getOutgoing({ nPortionSize: this.nWorkersLeft });
|
||
//Если есть сообщения
|
||
if (Array.isArray(outMsgs) && outMsgs.length > 0) {
|
||
//Обходим их
|
||
for (let outMsg of outMsgs) {
|
||
//И запускаем обработчики
|
||
if (!this.isInProgress({ nQueueId: outMsg.nId })) {
|
||
try {
|
||
this.processMessage({ queue: outMsg });
|
||
} catch (e) {
|
||
//Фиксируем ошибку обработки сервером приложений - статус сообщения
|
||
let queue = await this.dbConn.setQueueState({
|
||
nQueueId: outMsg.nId,
|
||
sExecMsg: makeErrorText(e),
|
||
nIncExecCnt: NINC_EXEC_CNT_YES,
|
||
nExecState: outMsg.nExecCnt + 1 < outMsg.nRetryAttempts ? outMsg.nExecState : objQueueSchema.NQUEUE_EXEC_STATE_ERR
|
||
});
|
||
//Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений
|
||
await this.logger.error(makeErrorText(e), { nQueueId: outMsg.nId });
|
||
//Если исполнение завершилось полностью и с ошибкой - расскажем об этом
|
||
if (queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR) await this.notifyMessageProcessError({ queue });
|
||
}
|
||
}
|
||
}
|
||
}
|
||
//Запустили отработку всех считанных - перезапускаем цикл опроса исходящих сообщений
|
||
await this.restartDetectingLoop();
|
||
} catch (e) {
|
||
//Фиксируем ошибку в протоколе работы сервера приложений
|
||
await this.logger.error(makeErrorText(e));
|
||
await this.restartDetectingLoop();
|
||
}
|
||
} else {
|
||
//Нет свободных обработчиков - ждём и перезапускаем цикл опроса
|
||
await this.restartDetectingLoop();
|
||
}
|
||
}
|
||
//Запуск обработки очереди исходящих сообщений
|
||
startProcessing(prms) {
|
||
//Проверяем структуру переданного объекта для старта
|
||
let sCheckResult = validateObject(
|
||
prms,
|
||
prmsOutQueueSchema.startProcessing,
|
||
"Параметры функции запуска обработки очереди исходящих сообщений"
|
||
);
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
//Выставляем флаг работы
|
||
this.bWorking = true;
|
||
//запоминаем список обслуживаемых сервисов
|
||
this.services = prms.services;
|
||
//Начинаем слушать очередь исходящих
|
||
setTimeout(this.outDetectingLoop, NDETECTING_LOOP_DELAY);
|
||
//И оповещаем всех что запустились
|
||
this.notifyStarted();
|
||
} else {
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
}
|
||
//Остановка обработки очереди исходящих сообщений
|
||
stopProcessing() {
|
||
//Выставляем флаг неработы
|
||
this.bWorking = false;
|
||
//Останавливаем опрос очереди
|
||
if (this.nDetectingLoopTimeOut) {
|
||
clearTimeout(this.nDetectingLoopTimeOut);
|
||
this.nDetectingLoopTimeOut = null;
|
||
}
|
||
//Ждем завершения работы всех обработчиков
|
||
let i = setInterval(() => {
|
||
if (!this.bWorking && this.nWorkersLeft >= this.outGoing.nMaxWorkers) {
|
||
clearInterval(i);
|
||
this.notifyStopped();
|
||
}
|
||
}, NWORKERS_WAIT_INTERVAL);
|
||
}
|
||
}
|
||
|
||
//-----------------
|
||
// Интерфейс модуля
|
||
//-----------------
|
||
|
||
exports.SEVT_OUT_QUEUE_STARTED = SEVT_OUT_QUEUE_STARTED;
|
||
exports.SEVT_OUT_QUEUE_STOPPED = SEVT_OUT_QUEUE_STOPPED;
|
||
exports.OutQueue = OutQueue;
|