Альтернативный обработчик очереди - полностью автономный, создающий собственное подключение к БД

This commit is contained in:
Mikhail Chechnev 2018-12-06 21:44:51 +03:00
parent d3ef97f692
commit 39fcf9fd10
3 changed files with 674 additions and 1 deletions

View File

@ -9,11 +9,12 @@
const lg = require("./logger"); //Протоколирование работы
const db = require("./db_connector"); //Взаимодействие с БД
const oq = require("./out_queue"); //Прослушивание очереди исходящих сообщений
const oq = require("./out_queue2"); //Прослушивание очереди исходящих сообщений
const { ServerError } = require("./server_errors"); //Типовая ошибка
const { validateObject } = require("./utils"); //Вспомогательные функции
const { SERR_COMMON, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы
const prmsAppSchema = require("../models/prms_app"); //Схема валидации параметров функций класса
//------------
// Тело модуля
//------------

291
core/out_queue2.js Normal file
View File

@ -0,0 +1,291 @@
/*
Сервис интеграции ПП Парус 8 с WEB API
Модуль ядра: отработка очереди исходящих сообщений
*/
//------------------------------
// Подключение внешних библиотек
//------------------------------
const _ = require("lodash"); //Работа с массивами и коллекциями
const EventEmitter = require("events"); //Обработчик пользовательских событий
const ChildProcess = require("child_process"); //Работа с дочерними процессами
const { ServerError } = require("./server_errors"); //Типовая ошибка
const { SERR_UNEXPECTED, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы
const { validateObject } = require("./utils"); //Вспомогательные функции
const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди
const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди
const prmsOutQueueSchema = require("../models/prms_out_queue"); //Схемы валидации параметров функций класса
//--------------------------
// Глобальные идентификаторы
//--------------------------
//Типовые события
const SEVT_OUT_QUEUE_STARTED = "OUT_QUEUE_STARTED"; //Обработчик очереди запущен
const SEVT_OUT_QUEUE_STOPPED = "OUT_QUEUE_STOPPED"; //Обработчик очереди остановлен
//Время отложенного старта опроса очереди (мс)
const NDETECTING_LOOP_DELAY = 3000;
//Интервал проверки завершения обработчиков (мс)
const NWORKERS_WAIT_INTERVAL = 1000;
//------------
// Тело модуля
//------------
//Класс очереди сообщений
class OutQueue extends EventEmitter {
//Конструктор класса
constructor(prms) {
//Создадим экземпляр родительского класса
super();
//Проверяем структуру переданного объекта для подключения
let sCheckResult = validateObject(prms, prmsOutQueueSchema.OutQueue, "Параметры конструктора класса OutQueue");
//Если структура объекта в норме
if (!sCheckResult) {
//Список обслуживаемых сервисов
this.services = null;
//Признак функционирования обработчика
this.bWorking = false;
//Параметры очереди
this.outGoing = _.cloneDeep(prms.outGoing);
//Количество доступных обработчиков
this.nWorkersLeft = this.outGoing.nMaxWorkers;
//Идентификатор таймера проверки очереди
this.nDetectingLoopTimeOut = null;
//Запомним подключение к БД
this.dbConn = prms.dbConn;
//Запомним логгер
this.logger = prms.logger;
//Список обрабатываемых в текущий момент сообщений очереди
this.inProgress = [];
//Привяжем методы к указателю на себя для использования в обработчиках событий
this.outDetectingLoop = this.outDetectingLoop.bind(this);
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Уведомление об остановке обработчика очереди
notifyStarted() {
//оповестим подписчиков о появлении нового отчета
this.emit(SEVT_OUT_QUEUE_STARTED);
}
//Уведомление об остановке обработчика очереди
notifyStopped() {
//оповестим подписчиков о появлении нового отчета
this.emit(SEVT_OUT_QUEUE_STOPPED);
}
//Добавление идентификатора позиции очереди в список обрабатываемых
addInProgress(nId) {
const i = this.inProgress.indexOf(nId);
if (i === -1) this.inProgress.push(nId);
}
//Удаление идентификатора позиции очереди из списка обрабатываемых
rmInProgress(nId) {
const i = this.inProgress.indexOf(nId);
if (i > -1) {
this.inProgress.splice(i, 1);
}
}
//Проверка наличия идентификатора позиции очереди в списке обрабатываемых
isInProgress(nId) {
return !(this.inProgress.indexOf(nId) === -1);
}
//Запуск обработки очередного сообщения
processMessage(prms) {
//Проверяем структуру переданного объекта для старта
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.processMessage,
"Параметры функции запуска обработки очередного сообщения"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Проверим, что есть доступные обработчики
if (this.nWorkersLeft > 0) {
//Переопределим себя для обращения внутри обработчиков событий
const self = this;
//Создаём новый обработчик сообщений
const proc = ChildProcess.fork("core/out_queue_processor2", { silent: false });
//Текущее состояние сообщения
let curQueue = null;
//Перехват сообщений обработчика
proc.on("message", async result => {
//Проверяем структуру полученного сообщения
/*
let sCheckResult = validateObject(
result,
objOutQueueProcessorSchema.OutQueueProcessorTaskResult,
"Ответ обработчика очереди исходящих сообщений"
);
//Если структура сообщения в норме
if (!sCheckResult) {
*/
/*
} else {
//Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений
await self.logger.error(
`Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`,
{ nQueueId: prms.queue.nId }
);
}
*/
if (result.sExecResult == "ERR") {
await self.logger.error(`Ошибка обработки исходящего сообщения: ${result.sExecMsg}`, {
nQueueId: prms.queue.nId
});
//Фиксируем ошибку обработки - статус сообщения (сам статус - не меняем, здесь только фатальные ошибки, но делаем инкремент количества попыток)
await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: result.sExecMsg,
nIncExecCnt: NINC_EXEC_CNT_YES
});
}
//Останавливаем обработчик и инкрементируем флаг их доступного количества
this.rmInProgress(prms.queue.nId);
proc.kill();
this.nWorkersLeft++;
});
//Перехват ошибок обработчика
proc.on("error", async e => {
//Сформируем текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Фиксируем ошибку в протоколе работы
await self.logger.error(`Ошибка обработки исходящего сообщения: ${sErr}`, {
nQueueId: prms.queue.nId
});
//Фиксируем ошибку обработки - статус сообщения (сам статус - не меняем, здесь только фатальные ошибки, но делаем инкремент количества попыток)
await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: sErr,
nIncExecCnt: NINC_EXEC_CNT_YES
});
//Останавливаем обработчик и инкрементируем флаг их доступного количества
this.rmInProgress(prms.queue.nId);
proc.kill();
this.nWorkersLeft++;
});
//Перехват останова обработчика
proc.on("exit", code => {});
//Запускаем обработчик
this.addInProgress(prms.queue.nId);
proc.send({
nQueueId: prms.queue.nId,
connectSettings: self.dbConn.connectSettings
});
//Уменьшаем количество доступных обработчиков
this.nWorkersLeft--;
}
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Перезапуск опроса очереди исходящих сообщений
async restartDetectingLoop() {
//Включаем опрос очереди только если установлен флаг работы
if (this.bWorking) {
this.nDetectingLoopTimeOut = await setTimeout(async () => {
await this.outDetectingLoop();
}, this.outGoing.nCheckTimeout);
}
}
//Опрос очереди исходящих сообщений
async outDetectingLoop() {
//Если есть свободные обработчики
if (this.nWorkersLeft > 0) {
//Сходим на сервер за очередным исходящим сообщением
try {
//Заберем столько сообщений, сколько можем обработать одновременно
let outMsgs = await this.dbConn.getOutgoing({ nPortionSize: this.nWorkersLeft });
//Если есть сообщения
if (Array.isArray(outMsgs) && outMsgs.length > 0) {
//Обходим их
for (let i = 0; i < outMsgs.length; i++) {
//И запускаем обработчики
if (!this.isInProgress(outMsgs[i].nId)) {
try {
this.processMessage({ queue: outMsgs[i] });
} catch (e) {
//Какие непредвиденные ошибки при обработке текущего сообщения - подготовим текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Фиксируем ошибку обработки сервером приложений - статус сообщения (сам статус - не меняем, здесь только фатальные ошибки, но делаем инкремент количества попыток)
await this.dbConn.setQueueState({
nQueueId: outMsgs[i].nId,
sExecMsg: sErr,
nIncExecCnt: NINC_EXEC_CNT_YES
});
//Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений
await this.logger.error(sErr, { nQueueId: outMsgs[i].nId });
}
}
}
}
//Запустили отработку всех считанных - перезапускаем цикл опроса исходящих сообщений
await this.restartDetectingLoop();
} catch (e) {
//Какие непредвиденные ошибки при получении списка сообщений - подготовим текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Фиксируем ошибку в протоколе работы сервера приложений
await this.logger.error(sErr);
await this.restartDetectingLoop();
}
} else {
//Нет свободных обработчиков - ждём и перезапускаем цикл опроса
await this.restartDetectingLoop();
}
}
//Запуск обработки очереди исходящих сообщений
startProcessing(prms) {
//Проверяем структуру переданного объекта для старта
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.startProcessing,
"Параметры функции запуска обработки очереди исходящих сообщений"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Выставляем флаг работы
this.bWorking = true;
//запоминаем список обслуживаемых сервисов
this.services = prms.services;
//Начинаем слушать очередь исходящих
setTimeout(this.outDetectingLoop, NDETECTING_LOOP_DELAY);
//И оповещаем всех что запустились
this.notifyStarted();
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Остановка обработки очереди исходящих сообщений
stopProcessing() {
//Выставляем флаг неработы
this.bWorking = false;
//Останавливаем опрос очереди
if (this.nDetectingLoopTimeOut) {
clearTimeout(this.nDetectingLoopTimeOut);
this.nDetectingLoopTimeOut = null;
}
//Ждем завершения работы всех обработчиков
let i = setInterval(() => {
if (!this.bWorking && this.nWorkersLeft == this.outGoing.nMaxWorkers) {
clearInterval(i);
this.notifyStopped();
}
}, NWORKERS_WAIT_INTERVAL);
}
}
//-----------------
// Интерфейс модуля
//-----------------
exports.SEVT_OUT_QUEUE_STARTED = SEVT_OUT_QUEUE_STARTED;
exports.SEVT_OUT_QUEUE_STOPPED = SEVT_OUT_QUEUE_STOPPED;
exports.OutQueue = OutQueue;

View File

@ -0,0 +1,381 @@
/*
Сервис интеграции ПП Парус 8 с WEB API
Модуль ядра: обработчик исходящего сообщения
*/
//----------------------
// Подключение библиотек
//----------------------
require("module-alias/register"); //Поддержка псевонимов при подключении модулей
const _ = require("lodash"); //Работа с массивами и коллекциями
const lg = require("./logger"); //Протоколирование работы
const db = require("./db_connector"); //Взаимодействие с БД
const { makeModuleFullPath, validateObject } = require("./utils"); //Вспомогательные функции
const { ServerError } = require("./server_errors"); //Типовая ошибка
const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений
const prmsOutQueueProcessorSchema = require("../models/prms_out_queue_processor"); //Схема валидации параметров функций модуля
const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди
const {
SERR_UNEXPECTED,
SERR_MODULES_BAD_INTERFACE,
SERR_OBJECT_BAD_INTERFACE,
SERR_MODULES_NO_MODULE_SPECIFIED
} = require("./constants"); //Глобальные константы
const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
//----------
// Константы
//----------
//--------------------------
// Глобальные идентификаторы
//--------------------------
let dbConn = null; //Подключение к БД
let logger = null; //Протоколирование работы
//------------
// Тело модуля
//------------
//Отправка родительскому процессу ошибки обработки сообщения сервером приложений
const sendErrorResult = sMessage => {
process.send({
sExecResult: "ERR",
sExecMsg: sMessage
});
};
//Отправка родительскому процессу успеха обработки сообщения сервером приложений
const sendOKResult = () => {
process.send({
sExecResult: "OK",
sExecMsg: null
});
};
//Запись в файл !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! УБРАТЬ!!!!!!!!!!!!!!!!!
const writeToFile = queue => {
return new Promise((resolve, reject) => {
const fs = require("fs");
fs.writeFile("c:/repos/temp/" + queue.nId, queue.blMsg, err => {
if (err) {
reject(new ServerError(SERR_UNEXPECTED, `Ошибка отработки сообщения ${prms.queue.nId}`));
} else {
resolve();
}
});
});
};
//Запуск обработки сообщения сервером приложений
const appProcess = async prms => {
//Обработанное сообщение
let newQueue = null;
//Обрабатываем
try {
//Фиксируем начало исполнения сервером приложений - в статусе сообщения
newQueue = await dbConn.setQueueState({
nQueueId: prms.queue.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
});
//Скажем что начали обработку
await logger.info(
`Обрабатываю исходящее сообщение сервером приложений: ${prms.queue.nId}, ${prms.queue.sInDate}, ${
prms.queue.sServiceFnCode
}, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
{ nQueueId: prms.queue.nId }
);
if (prms.queue.blMsg) {
await writeToFile(prms.queue);
let sMsg = prms.queue.blMsg.toString() + " MODIFICATION FOR " + prms.queue.nId;
//Фиксируем успех исполнения
newQueue = await dbConn.setQueueAppSrvResult({
nQueueId: prms.queue.nId,
blMsg: new Buffer(sMsg),
blResp: new Buffer("REPLAY ON " + prms.queue.nId)
});
//Фиксируем успешное исполнение сервером приложений - в статусе сообщения
newQueue = await dbConn.setQueueState({
nQueueId: prms.queue.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
});
//Фиксируем успешное исполнение сервером приложений - в протоколе работы сервиса
await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, {
nQueueId: prms.queue.nId
});
} else {
throw new ServerError(
SERR_UNEXPECTED,
`Ошибка отработки сообщения ${prms.queue.nId}: нет данных для обработки`
);
}
} catch (e) {
//Сформируем текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Фиксируем ошибку обработки сервером приложений - в статусе сообщения
newQueue = await dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: sErr,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts
? objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса
await logger.error(`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ${sErr}`, {
nQueueId: prms.queue.nId
});
}
//Возвращаем результат
return newQueue;
};
//Запуск обработки сообщения сервером БД
const dbProcess = async prms => {
//Проверяем структуру переданного объекта для старта
//let sCheckResult = validateObject(
// prms,
// prmsOutQueueSchema.dbProcess,
// "Параметры функции запуска обработки ообщения сервером БД"
//);
//Если структура объекта в норме
//if (!sCheckResult) {
//Обрабатываем
try {
//Фиксируем начало исполнения сервером БД - в статусе сообщения
await dbConn.setQueueState({
nQueueId: prms.queue.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB
});
//Скажем что начали обработку
await logger.info(
`Обрабатываю исходящее сообщение сервером БД: ${prms.queue.nId}, ${prms.queue.sInDate}, ${
prms.queue.sServiceFnCode
}, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
{ nQueueId: prms.queue.nId }
);
//Вызов обработчика БД
await dbConn.execQueueDBPrc({ nQueueId: prms.queue.nId });
//Фиксируем успешное исполнение сервером БД - в статусе сообщения
await dbConn.setQueueState({
nQueueId: prms.queue.nId,
nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
//Фиксируем успешное исполнение сервером БД - в протоколе работы сервиса
await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером БД`, {
nQueueId: prms.queue.nId
});
} catch (e) {
//Сформируем текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Фиксируем ошибку обработки сервером БД - в статусе сообщения
await dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: sErr,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts
? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса
await logger.error(`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером БД: ${sErr}`, {
nQueueId: prms.queue.nId
});
}
//} else {
// throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
//}
};
//Протоколирование предупреждения о ненадлежащем статусе сообщения
const warnBadStateForProcess = async prms => {
//Предупредим о неверном статусе сообщения (такие сюда попадать не должны)
await logger.warn(`Cообщение ${prms.queue.nId} в статусе ${prms.queue.sExecState} попало в очередь обработчика`, {
nQueueId: prms.queue.nId
});
};
//Обработка задачи
const processTask = async prms => {
//Проверяем параметры
/*
let sCheckResult = validateObject(
prms,
prmsOutQueueProcessorSchema.processTask,
"Параметры функции обработки задачи"
);
*/
//Если параметры в норме
//if (!sCheckResult) {
let q = null;
try {
//Создаём подключение к БД
dbConn = new db.DBConnector({ connectSettings: prms.task.connectSettings });
//Создаём логгер для протоколирования работы
logger = new lg.Logger();
//Подключим логгер к БД (и отключим когда надо)
dbConn.on(db.SEVT_DB_CONNECTOR_CONNECTED, connection => {
logger.setDBConnector(dbConn, true);
});
dbConn.on(db.SEVT_DB_CONNECTOR_DISCONNECTED, () => {
logger.removeDBConnector();
});
//Подключаемся к БД
await dbConn.connect();
//Считываем запись очереди
q = await dbConn.getQueue({ nQueueId: prms.task.nQueueId });
//Далее работаем от статуса считанной записи
switch (q.nExecState) {
//Поставлено в очередь
case objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE: {
//Запускаем обработку сервером приложений
let res = await appProcess({ queue: q });
//И если она успешно завершилась - обработку сервоером БД
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) await dbProcess({ queue: res });
break;
}
//Обрабатывается сервером приложений
case objQueueSchema.NQUEUE_EXEC_STATE_APP: {
//Ничего не делаем, но предупредим что так быть не должно
await warnBadStateForProcess({ queue: q });
break;
}
//Ошибка обработки сервером приложений
case objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR: {
//Если ещё есть попытки отработки
if (q.nExecCnt < q.nRetryAttempts) {
//Снова запускаем обработку сервером приложений
let res = await appProcess({ queue: q });
//И если она успешно завершилась - обработку сервоером БД
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) await dbProcess({ queue: res });
} else {
//Попыток нет - финализируем обработку
await dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: q.sExecMsg,
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
}
break;
}
//Успешно обработано сервером приложений
case objQueueSchema.NQUEUE_EXEC_STATE_APP_OK: {
//Запускаем обработку в БД
await dbProcess({ queue: q });
break;
}
//Обрабатывается в БД
case objQueueSchema.NQUEUE_EXEC_STATE_DB: {
//Ничего не делаем, но предупредим что так быть не должно
await warnBadStateForProcess({ queue: q });
break;
}
//Ошибка обработки в БД
case objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR: {
//Если ещё есть попытки отработки
if (q.nExecCnt < q.nRetryAttempts) {
//Снова запускаем обработку сервером БД
await dbProcess({ queue: q });
} else {
//Попыток нет - финализируем обработку
await dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: q.sExecMsg,
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
}
break;
}
//Успешно обработано в БД
case objQueueSchema.NQUEUE_EXEC_STATE_DB_OK: {
//Финализируем
await dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: null,
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
break;
}
//Обработано с ошибками
case objQueueSchema.NQUEUE_EXEC_STATE_ERR: {
//Ничего не делаем, но предупредим что так быть не должно
await warnBadStateForProcess({ queue: q });
break;
}
//Обработано успешно
case objQueueSchema.NQUEUE_EXEC_STATE_OK: {
//Ничего не делаем, но предупредим что так быть не должно
await warnBadStateForProcess({ queue: q });
break;
}
default: {
//Ничего не делаем
break;
}
}
//Отключаемся от БД
if (dbConn) await dbConn.disconnect();
//Отправляем успех
sendOKResult();
} catch (e) {
//Отключаемся от БД
if (dbConn) await dbConn.disconnect();
//Отправляем ошибку
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
sendErrorResult(sErr);
}
//Отправим родителю информацию о том, что закончили обработку
//} else {
// sendErrorResult({ sMessage: sCheckResult });
//}
};
//---------------------------------
// Управление процессом обработчика
//---------------------------------
//Перехват CTRL + C (останов процесса)
process.on("SIGINT", () => {});
//Перехват CTRL + \ (останов процесса)
process.on("SIGQUIT", () => {});
//Перехват мягкого останова процесса
process.on("SIGTERM", () => {});
//Перехват ошибок
process.on("uncaughtException", e => {
//Отправляем ошибку родительскому процессу
sendErrorResult(e.message);
});
//Приём сообщений
process.on("message", task => {
//Проверяем структуру переданного сообщения
/*
let sCheckResult = validateObject(
task,
objOutQueueProcessorSchema.OutQueueProcessorTask,
"Задача обработчика очереди исходящих сообщений"
);
*/
//Если структура объекта в норме
//if (!sCheckResult) {
//Запускаем обработку
processTask({ task });
//} else {
// throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
//}
});