P8-ExchangeService/core/out_queue.js

449 lines
28 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
Сервис интеграции ПП Парус 8 с WEB API
Модуль ядра: отработка очереди исходящих сообщений
*/
//------------------------------
// Подключение внешних библиотек
//------------------------------
const _ = require("lodash"); //Работа с массивами и коллекциями
const EventEmitter = require("events"); //Обработчик пользовательских событий
const ChildProcess = require("child_process"); //Работа с дочерними процессами
const { ServerError } = require("./server_errors"); //Типовая ошибка
const { SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы
const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции
const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди
const { NFORCE_YES } = require("../models/common"); //Общие константы и схемы валидации
const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди
const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса
const prmsOutQueueSchema = require("../models/prms_out_queue"); //Схемы валидации параметров функций класса
//--------------------------
// Глобальные идентификаторы
//--------------------------
//Типовые события
const SEVT_OUT_QUEUE_STARTED = "OUT_QUEUE_STARTED"; //Обработчик очереди запущен
const SEVT_OUT_QUEUE_STOPPED = "OUT_QUEUE_STOPPED"; //Обработчик очереди остановлен
//Время отложенного старта опроса очереди (мс)
const NDETECTING_LOOP_DELAY = 3000;
//Интервал проверки завершения обработчиков (мс)
const NWORKERS_WAIT_INTERVAL = 1000;
//------------
// Тело модуля
//------------
//Класс очереди исходящих сообщений
class OutQueue extends EventEmitter {
//Конструктор класса
constructor(prms) {
//Создадим экземпляр родительского класса
super();
//Проверяем структуру переданного объекта для подключения
let sCheckResult = validateObject(prms, prmsOutQueueSchema.OutQueue, "Параметры конструктора класса OutQueue");
//Если структура объекта в норме
if (!sCheckResult) {
//Список обслуживаемых сервисов
this.services = null;
//Признак функционирования обработчика
this.bWorking = false;
//Параметры очереди
this.outGoing = _.cloneDeep(prms.outGoing);
//Количество доступных обработчиков
this.nWorkersLeft = this.outGoing.nMaxWorkers;
//Идентификатор таймера проверки очереди
this.nDetectingLoopTimeOut = null;
//Запомним подключение к БД
this.dbConn = prms.dbConn;
//Запомним логгер
this.logger = prms.logger;
//Запомним уведомитель
this.notifier = prms.notifier;
//Запомним глобальный адрес прокси-сервера
this.sProxy = prms.sProxy;
//Список обрабатываемых в текущий момент сообщений очереди
this.inProgress = [];
//Привяжем методы к указателю на себя для использования в обработчиках событий
this.outDetectingLoop = this.outDetectingLoop.bind(this);
//Параметры подключения к Kafka
this.kafka = prms.kafka;
//Параметры подключения к MQTT
this.mqtt = prms.mqtt;
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Уведомление о запуске обработчика очереди
notifyStarted() {
//Оповестим подписчиков о запуске
this.emit(SEVT_OUT_QUEUE_STARTED);
}
//Уведомление об остановке обработчика очереди
notifyStopped() {
//Оповестим подписчиков об останове
this.emit(SEVT_OUT_QUEUE_STOPPED);
}
//Добавление идентификатора позиции очереди в список обрабатываемых
addInProgress(prms) {
//Проверяем структуру переданного объекта
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.addInProgress,
"Параметры функции добавления идентификатора позиции очереди в список обрабатываемых"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Проверим, что такого идентификатора ещё нет в списке обрабатываемых
const i = this.inProgress.indexOf(prms.nQueueId);
//Если нет - добавим
if (i === -1) this.inProgress.push(prms.nQueueId);
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Удаление идентификатора позиции очереди из списка обрабатываемых
rmInProgress(prms) {
//Проверяем структуру переданного объекта
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.rmInProgress,
"Параметры функции удаления идентификатора позиции очереди из списка обрабатываемых"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Если удаляемый идентификатор есть в списке
const i = this.inProgress.indexOf(prms.nQueueId);
//Удалим его
if (i > -1) {
this.inProgress.splice(i, 1);
}
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Проверка наличия идентификатора позиции очереди в списке обрабатываемых
isInProgress(prms) {
//Проверяем структуру переданного объекта
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.isInProgress,
"Параметры функции проверки наличия идентификатора позиции очереди в списке обрабатываемых"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Проверим наличие идентификатора в списке
return !(this.inProgress.indexOf(prms.nQueueId) === -1);
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Старт обработчика
startQueueProcessor(prms) {
//Проверяем структуру переданного объекта для старта обработчика
let sCheckResult = validateObject(prms, prmsOutQueueSchema.startQueueProcessor, "Параметры функции запуска обработчика сообщения очереди");
//Если структура объекта в норме
if (!sCheckResult) {
//Добавляем идентификатор позиции очереди в список обрабатываемых
this.addInProgress({ nQueueId: prms.queue.nId });
//Отдаём команду дочернему процессу обработчика на старт исполнения
prms.proc.send({
nQueueId: prms.queue.nId,
connectSettings: {
...this.dbConn.connectSettings,
nPoolMin: this.outGoing.nPoolMin,
nPoolMax: this.outGoing.nPoolMax,
nPoolIncrement: this.outGoing.nPoolIncrement
},
service: _.find(this.services, { nId: prms.queue.nServiceId }),
function: _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, {
nId: prms.queue.nServiceFnId
}),
sProxy: this.sProxy,
kafka: this.kafka,
mqtt: this.mqtt
});
//Уменьшаем количество доступных обработчиков
this.nWorkersLeft--;
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Останов обработчика
stopQueueProcessor(prms) {
//Проверяем структуру переданного объекта для останова обработчика
let sCheckResult = validateObject(prms, prmsOutQueueSchema.stopQueueProcessor, "Параметры функции останова обработчика сообщения очереди");
//Если структура объекта в норме
if (!sCheckResult) {
//Удаляем идентификатор позиции очереди из списка обрабатываемых
this.rmInProgress({ nQueueId: prms.nQueueId });
//Завершаем дочерний процесс обработчика
prms.proc.kill();
//Увеличиваем количество доступных обработчиков
this.nWorkersLeft++;
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Оповещение об ошибке исполнения сообщения
async notifyMessageProcessError(prms) {
try {
//Проверяем структуру переданного объекта для отправки оповещения
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.notifyMessageProcessError,
"Параметры функции оповещения об ошибке исполнения сообщения"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Найдем сервис и функцию, исполнявшие данное сообщение
let service = _.find(this.services, { nId: prms.queue.nServiceId });
let func = _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, {
nId: prms.queue.nServiceFnId
});
//Если нашли и для функции-обработчика указан признак необходимости оповещения об ошибках
if (service && func && func.nErrNtfSign == objServiceFnSchema.NERR_NTF_SIGN_YES)
//Отправим уведомление об ошибке отработки в почту
await this.notifier.addMessage({
sTo: func.sErrNtfMail,
sSubject: `Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений для функции "${func.sCode}" сервиса "${service.sCode}"`,
sMessage: prms.queue.sExecMsg
});
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
} catch (e) {
await this.logger.error(`При отправке уведомления об ошибке обработки исходящего сообщения: ${makeErrorText(e)}`);
}
}
//Запуск обработки очередного сообщения
processMessage(prms) {
//Проверяем структуру переданного объекта
let sCheckResult = validateObject(prms, prmsOutQueueSchema.processMessage, "Параметры функции запуска обработки очередного сообщения");
//Если структура объекта в норме
if (!sCheckResult) {
//Проверим, что есть доступные обработчики
if (this.nWorkersLeft > 0) {
//Переопределим себя для обращения внутри обработчиков событий
const self = this;
//Запоминаем текущее количество попыток обработки
const nQueueOldExecCnt = prms.queue.nExecCnt;
//Буфер для ошибок (для журнала работы и очереди обмена)
let sErrorLog = null;
let sError = null;
//Создаём новый обработчик сообщений
const proc = ChildProcess.fork("core/out_queue_processor", { silent: false });
//Перехват сообщений обработчика
proc.on("message", async result => {
//Перечитывание не требуется, если выполнено успешно
if (result.sResult !== objOutQueueProcessorSchema.STASK_RESULT_OK) {
//Перечитываем запись очереди с учетом изменения статуса
prms.queue = await self.dbConn.getQueue({ nQueueId: prms.queue.nId });
}
//Проверяем структуру полученного сообщения
let sCheckResult = validateObject(
result,
objOutQueueProcessorSchema.OutQueueProcessorTaskResult,
"Ответ обработчика очереди исходящих сообщений"
);
//Если структура сообщения в норме
if (!sCheckResult) {
//Анализируем результат обработки - если ошибка - фиксируем
if (result.sResult == objOutQueueProcessorSchema.STASK_RESULT_ERR) {
//Запоминаем ошибку обработчика
sErrorLog = `Ошибка обработки исходящего сообщения: ${result.sMsg}`;
sError = result.sMsg;
} else {
//Ошибки обработки нет, но может быть есть ошибка аутентификации
if (result.sResult == objOutQueueProcessorSchema.STASK_RESULT_UNAUTH) {
//Ставим задачу на аутентификацию сервиса
try {
await this.dbConn.putServiceAuthInQueue({
nServiceId: prms.queue.nServiceId,
nForce: NFORCE_YES
});
} catch (e) {
//Отразим в протоколе ошибку постановки задачи на аутентификацию сервиса
await self.logger.error(`Ошибка постановки задачи на аутентификацию сервиса: ${makeErrorText(e)}`, {
nQueueId: prms.queue.nId
});
}
}
}
} else {
//Пришел неожиданный ответ обработчика
sErrorLog = `Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`;
sError = sCheckResult;
}
//Фиксируем ошибки, если есть
if (sError) {
//Запись в протокол работы сервиса
await self.logger.error(sErrorLog, { nQueueId: prms.queue.nId });
//Запись в статус сообщения
prms.queue = await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: sError,
nIncExecCnt: nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState:
(nQueueOldExecCnt == prms.queue.nExecCnt ? prms.queue.nExecCnt + 1 : prms.queue.nExecCnt) < prms.queue.nRetryAttempts
? prms.queue.nExecState
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
}
//Если исполнение завершилось полностью и с ошибкой - расскажем об этом
if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR) await this.notifyMessageProcessError(prms);
//Останавливаем обработчик и инкрементируем флаг их доступного количества
try {
this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc });
} catch (e) {
//Отразим в протоколе ошибку останова
await self.logger.error(`Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`, {
nQueueId: prms.queue.nId
});
}
});
//Перехват ошибок обработчика
proc.on("error", async e => {
//Считываем сообщение изменённое обработчиком
prms.queue = await self.dbConn.getQueue({ nQueueId: prms.queue.nId });
//Фиксируем ошибку в протоколе работы
await self.logger.error(`Ошибка обработки исходящего сообщения: ${makeErrorText(e)}`, {
nQueueId: prms.queue.nId
});
//Фиксируем ошибку обработки - статус сообщения
prms.queue = await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: makeErrorText(e),
nIncExecCnt: nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState:
(nQueueOldExecCnt == prms.queue.nExecCnt ? prms.queue.nExecCnt + 1 : prms.queue.nExecCnt) < prms.queue.nRetryAttempts
? prms.queue.nExecState
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Если исполнение завершилось полностью и с ошибкой - расскажем об этом
if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR) await this.notifyMessageProcessError(prms);
//Останавливаем обработчик и инкрементируем флаг их доступного количества
try {
this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc });
} catch (e) {
//Отразим в протоколе ошибку останова
await self.logger.error(`Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`, {
nQueueId: prms.queue.nId
});
}
});
//Перехват останова обработчика
proc.on("exit", code => {});
//Запускаем обработчик
this.startQueueProcessor({ queue: prms.queue, proc });
}
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Перезапуск опроса очереди исходящих сообщений
async restartDetectingLoop() {
//Включаем опрос очереди только если установлен флаг работы
if (this.bWorking) {
this.nDetectingLoopTimeOut = setTimeout(async () => {
await this.outDetectingLoop();
}, this.outGoing.nCheckTimeout);
}
}
//Опрос очереди исходящих сообщений
async outDetectingLoop() {
//Если есть свободные обработчики
if (this.nWorkersLeft > 0) {
//Сходим на сервер за очередным исходящим сообщением
try {
//Заберем столько сообщений, сколько можем обработать одновременно
let outMsgs = await this.dbConn.getOutgoing({ nPortionSize: this.nWorkersLeft });
//Если есть сообщения
if (Array.isArray(outMsgs) && outMsgs.length > 0) {
//Обходим их
for (let outMsg of outMsgs) {
//И запускаем обработчики
if (!this.isInProgress({ nQueueId: outMsg.nId })) {
try {
this.processMessage({ queue: outMsg });
} catch (e) {
//Фиксируем ошибку обработки сервером приложений - статус сообщения
let queue = await this.dbConn.setQueueState({
nQueueId: outMsg.nId,
sExecMsg: makeErrorText(e),
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState: outMsg.nExecCnt + 1 < outMsg.nRetryAttempts ? outMsg.nExecState : objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений
await this.logger.error(makeErrorText(e), { nQueueId: outMsg.nId });
//Если исполнение завершилось полностью и с ошибкой - расскажем об этом
if (queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR) await this.notifyMessageProcessError({ queue });
}
}
}
}
//Запустили отработку всех считанных - перезапускаем цикл опроса исходящих сообщений
await this.restartDetectingLoop();
} catch (e) {
//Фиксируем ошибку в протоколе работы сервера приложений
await this.logger.error(makeErrorText(e));
await this.restartDetectingLoop();
}
} else {
//Нет свободных обработчиков - ждём и перезапускаем цикл опроса
await this.restartDetectingLoop();
}
}
//Запуск обработки очереди исходящих сообщений
startProcessing(prms) {
//Проверяем структуру переданного объекта для старта
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.startProcessing,
"Параметры функции запуска обработки очереди исходящих сообщений"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Выставляем флаг работы
this.bWorking = true;
//запоминаем список обслуживаемых сервисов
this.services = prms.services;
//Начинаем слушать очередь исходящих
setTimeout(this.outDetectingLoop, NDETECTING_LOOP_DELAY);
//И оповещаем всех что запустились
this.notifyStarted();
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Остановка обработки очереди исходящих сообщений
stopProcessing() {
//Выставляем флаг неработы
this.bWorking = false;
//Останавливаем опрос очереди
if (this.nDetectingLoopTimeOut) {
clearTimeout(this.nDetectingLoopTimeOut);
this.nDetectingLoopTimeOut = null;
}
//Ждем завершения работы всех обработчиков
let i = setInterval(() => {
if (!this.bWorking && this.nWorkersLeft >= this.outGoing.nMaxWorkers) {
clearInterval(i);
this.notifyStopped();
}
}, NWORKERS_WAIT_INTERVAL);
}
}
//-----------------
// Интерфейс модуля
//-----------------
exports.SEVT_OUT_QUEUE_STARTED = SEVT_OUT_QUEUE_STARTED;
exports.SEVT_OUT_QUEUE_STOPPED = SEVT_OUT_QUEUE_STOPPED;
exports.OutQueue = OutQueue;