From 39fcf9fd10d972dc9df7276c1d61f495ed504ad6 Mon Sep 17 00:00:00 2001 From: Mikhail Chechnev Date: Thu, 6 Dec 2018 21:44:51 +0300 Subject: [PATCH] =?UTF-8?q?=D0=90=D0=BB=D1=8C=D1=82=D0=B5=D1=80=D0=BD?= =?UTF-8?q?=D0=B0=D1=82=D0=B8=D0=B2=D0=BD=D1=8B=D0=B9=20=D0=BE=D0=B1=D1=80?= =?UTF-8?q?=D0=B0=D0=B1=D0=BE=D1=82=D1=87=D0=B8=D0=BA=20=D0=BE=D1=87=D0=B5?= =?UTF-8?q?=D1=80=D0=B5=D0=B4=D0=B8=20-=20=D0=BF=D0=BE=D0=BB=D0=BD=D0=BE?= =?UTF-8?q?=D1=81=D1=82=D1=8C=D1=8E=20=D0=B0=D0=B2=D1=82=D0=BE=D0=BD=D0=BE?= =?UTF-8?q?=D0=BC=D0=BD=D1=8B=D0=B9,=20=D1=81=D0=BE=D0=B7=D0=B4=D0=B0?= =?UTF-8?q?=D1=8E=D1=89=D0=B8=D0=B9=20=D1=81=D0=BE=D0=B1=D1=81=D1=82=D0=B2?= =?UTF-8?q?=D0=B5=D0=BD=D0=BD=D0=BE=D0=B5=20=D0=BF=D0=BE=D0=B4=D0=BA=D0=BB?= =?UTF-8?q?=D1=8E=D1=87=D0=B5=D0=BD=D0=B8=D0=B5=20=D0=BA=20=D0=91=D0=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/app.js | 3 +- core/out_queue2.js | 291 ++++++++++++++++++++++++++ core/out_queue_processor2.js | 381 +++++++++++++++++++++++++++++++++++ 3 files changed, 674 insertions(+), 1 deletion(-) create mode 100644 core/out_queue2.js create mode 100644 core/out_queue_processor2.js diff --git a/core/app.js b/core/app.js index 5893ec6..41c06fa 100644 --- a/core/app.js +++ b/core/app.js @@ -9,11 +9,12 @@ const lg = require("./logger"); //Протоколирование работы const db = require("./db_connector"); //Взаимодействие с БД -const oq = require("./out_queue"); //Прослушивание очереди исходящих сообщений +const oq = require("./out_queue2"); //Прослушивание очереди исходящих сообщений const { ServerError } = require("./server_errors"); //Типовая ошибка const { validateObject } = require("./utils"); //Вспомогательные функции const { SERR_COMMON, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы const prmsAppSchema = require("../models/prms_app"); //Схема валидации параметров функций класса + //------------ // Тело модуля //------------ diff --git a/core/out_queue2.js b/core/out_queue2.js new file mode 100644 index 0000000..e46161e --- /dev/null +++ b/core/out_queue2.js @@ -0,0 +1,291 @@ +/* + Сервис интеграции ПП Парус 8 с WEB API + Модуль ядра: отработка очереди исходящих сообщений +*/ + +//------------------------------ +// Подключение внешних библиотек +//------------------------------ + +const _ = require("lodash"); //Работа с массивами и коллекциями +const EventEmitter = require("events"); //Обработчик пользовательских событий +const ChildProcess = require("child_process"); //Работа с дочерними процессами +const { ServerError } = require("./server_errors"); //Типовая ошибка +const { SERR_UNEXPECTED, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы +const { validateObject } = require("./utils"); //Вспомогательные функции +const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД +const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди +const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди +const prmsOutQueueSchema = require("../models/prms_out_queue"); //Схемы валидации параметров функций класса + +//-------------------------- +// Глобальные идентификаторы +//-------------------------- + +//Типовые события +const SEVT_OUT_QUEUE_STARTED = "OUT_QUEUE_STARTED"; //Обработчик очереди запущен +const SEVT_OUT_QUEUE_STOPPED = "OUT_QUEUE_STOPPED"; //Обработчик очереди остановлен + +//Время отложенного старта опроса очереди (мс) +const NDETECTING_LOOP_DELAY = 3000; + +//Интервал проверки завершения обработчиков (мс) +const NWORKERS_WAIT_INTERVAL = 1000; + +//------------ +// Тело модуля +//------------ + +//Класс очереди сообщений +class OutQueue extends EventEmitter { + //Конструктор класса + constructor(prms) { + //Создадим экземпляр родительского класса + super(); + //Проверяем структуру переданного объекта для подключения + let sCheckResult = validateObject(prms, prmsOutQueueSchema.OutQueue, "Параметры конструктора класса OutQueue"); + //Если структура объекта в норме + if (!sCheckResult) { + //Список обслуживаемых сервисов + this.services = null; + //Признак функционирования обработчика + this.bWorking = false; + //Параметры очереди + this.outGoing = _.cloneDeep(prms.outGoing); + //Количество доступных обработчиков + this.nWorkersLeft = this.outGoing.nMaxWorkers; + //Идентификатор таймера проверки очереди + this.nDetectingLoopTimeOut = null; + //Запомним подключение к БД + this.dbConn = prms.dbConn; + //Запомним логгер + this.logger = prms.logger; + //Список обрабатываемых в текущий момент сообщений очереди + this.inProgress = []; + //Привяжем методы к указателю на себя для использования в обработчиках событий + this.outDetectingLoop = this.outDetectingLoop.bind(this); + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } + //Уведомление об остановке обработчика очереди + notifyStarted() { + //оповестим подписчиков о появлении нового отчета + this.emit(SEVT_OUT_QUEUE_STARTED); + } + //Уведомление об остановке обработчика очереди + notifyStopped() { + //оповестим подписчиков о появлении нового отчета + this.emit(SEVT_OUT_QUEUE_STOPPED); + } + //Добавление идентификатора позиции очереди в список обрабатываемых + addInProgress(nId) { + const i = this.inProgress.indexOf(nId); + if (i === -1) this.inProgress.push(nId); + } + //Удаление идентификатора позиции очереди из списка обрабатываемых + rmInProgress(nId) { + const i = this.inProgress.indexOf(nId); + if (i > -1) { + this.inProgress.splice(i, 1); + } + } + //Проверка наличия идентификатора позиции очереди в списке обрабатываемых + isInProgress(nId) { + return !(this.inProgress.indexOf(nId) === -1); + } + //Запуск обработки очередного сообщения + processMessage(prms) { + //Проверяем структуру переданного объекта для старта + let sCheckResult = validateObject( + prms, + prmsOutQueueSchema.processMessage, + "Параметры функции запуска обработки очередного сообщения" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Проверим, что есть доступные обработчики + if (this.nWorkersLeft > 0) { + //Переопределим себя для обращения внутри обработчиков событий + const self = this; + //Создаём новый обработчик сообщений + const proc = ChildProcess.fork("core/out_queue_processor2", { silent: false }); + //Текущее состояние сообщения + let curQueue = null; + //Перехват сообщений обработчика + proc.on("message", async result => { + //Проверяем структуру полученного сообщения + /* + let sCheckResult = validateObject( + result, + objOutQueueProcessorSchema.OutQueueProcessorTaskResult, + "Ответ обработчика очереди исходящих сообщений" + ); + //Если структура сообщения в норме + if (!sCheckResult) { + */ + + /* + } else { + //Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений + await self.logger.error( + `Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`, + { nQueueId: prms.queue.nId } + ); + } + */ + if (result.sExecResult == "ERR") { + await self.logger.error(`Ошибка обработки исходящего сообщения: ${result.sExecMsg}`, { + nQueueId: prms.queue.nId + }); + //Фиксируем ошибку обработки - статус сообщения (сам статус - не меняем, здесь только фатальные ошибки, но делаем инкремент количества попыток) + await this.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: result.sExecMsg, + nIncExecCnt: NINC_EXEC_CNT_YES + }); + } + //Останавливаем обработчик и инкрементируем флаг их доступного количества + this.rmInProgress(prms.queue.nId); + proc.kill(); + this.nWorkersLeft++; + }); + //Перехват ошибок обработчика + proc.on("error", async e => { + //Сформируем текст ошибки + let sErr = `${SERR_UNEXPECTED}: ${e.message}`; + if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; + //Фиксируем ошибку в протоколе работы + await self.logger.error(`Ошибка обработки исходящего сообщения: ${sErr}`, { + nQueueId: prms.queue.nId + }); + //Фиксируем ошибку обработки - статус сообщения (сам статус - не меняем, здесь только фатальные ошибки, но делаем инкремент количества попыток) + await this.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: sErr, + nIncExecCnt: NINC_EXEC_CNT_YES + }); + //Останавливаем обработчик и инкрементируем флаг их доступного количества + this.rmInProgress(prms.queue.nId); + proc.kill(); + this.nWorkersLeft++; + }); + //Перехват останова обработчика + proc.on("exit", code => {}); + //Запускаем обработчик + this.addInProgress(prms.queue.nId); + proc.send({ + nQueueId: prms.queue.nId, + connectSettings: self.dbConn.connectSettings + }); + //Уменьшаем количество доступных обработчиков + this.nWorkersLeft--; + } + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } + //Перезапуск опроса очереди исходящих сообщений + async restartDetectingLoop() { + //Включаем опрос очереди только если установлен флаг работы + if (this.bWorking) { + this.nDetectingLoopTimeOut = await setTimeout(async () => { + await this.outDetectingLoop(); + }, this.outGoing.nCheckTimeout); + } + } + //Опрос очереди исходящих сообщений + async outDetectingLoop() { + //Если есть свободные обработчики + if (this.nWorkersLeft > 0) { + //Сходим на сервер за очередным исходящим сообщением + try { + //Заберем столько сообщений, сколько можем обработать одновременно + let outMsgs = await this.dbConn.getOutgoing({ nPortionSize: this.nWorkersLeft }); + //Если есть сообщения + if (Array.isArray(outMsgs) && outMsgs.length > 0) { + //Обходим их + for (let i = 0; i < outMsgs.length; i++) { + //И запускаем обработчики + if (!this.isInProgress(outMsgs[i].nId)) { + try { + this.processMessage({ queue: outMsgs[i] }); + } catch (e) { + //Какие непредвиденные ошибки при обработке текущего сообщения - подготовим текст ошибки + let sErr = `${SERR_UNEXPECTED}: ${e.message}`; + if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; + //Фиксируем ошибку обработки сервером приложений - статус сообщения (сам статус - не меняем, здесь только фатальные ошибки, но делаем инкремент количества попыток) + await this.dbConn.setQueueState({ + nQueueId: outMsgs[i].nId, + sExecMsg: sErr, + nIncExecCnt: NINC_EXEC_CNT_YES + }); + //Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений + await this.logger.error(sErr, { nQueueId: outMsgs[i].nId }); + } + } + } + } + //Запустили отработку всех считанных - перезапускаем цикл опроса исходящих сообщений + await this.restartDetectingLoop(); + } catch (e) { + //Какие непредвиденные ошибки при получении списка сообщений - подготовим текст ошибки + let sErr = `${SERR_UNEXPECTED}: ${e.message}`; + if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; + //Фиксируем ошибку в протоколе работы сервера приложений + await this.logger.error(sErr); + await this.restartDetectingLoop(); + } + } else { + //Нет свободных обработчиков - ждём и перезапускаем цикл опроса + await this.restartDetectingLoop(); + } + } + //Запуск обработки очереди исходящих сообщений + startProcessing(prms) { + //Проверяем структуру переданного объекта для старта + let sCheckResult = validateObject( + prms, + prmsOutQueueSchema.startProcessing, + "Параметры функции запуска обработки очереди исходящих сообщений" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Выставляем флаг работы + this.bWorking = true; + //запоминаем список обслуживаемых сервисов + this.services = prms.services; + //Начинаем слушать очередь исходящих + setTimeout(this.outDetectingLoop, NDETECTING_LOOP_DELAY); + //И оповещаем всех что запустились + this.notifyStarted(); + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } + //Остановка обработки очереди исходящих сообщений + stopProcessing() { + //Выставляем флаг неработы + this.bWorking = false; + //Останавливаем опрос очереди + if (this.nDetectingLoopTimeOut) { + clearTimeout(this.nDetectingLoopTimeOut); + this.nDetectingLoopTimeOut = null; + } + //Ждем завершения работы всех обработчиков + let i = setInterval(() => { + if (!this.bWorking && this.nWorkersLeft == this.outGoing.nMaxWorkers) { + clearInterval(i); + this.notifyStopped(); + } + }, NWORKERS_WAIT_INTERVAL); + } +} + +//----------------- +// Интерфейс модуля +//----------------- + +exports.SEVT_OUT_QUEUE_STARTED = SEVT_OUT_QUEUE_STARTED; +exports.SEVT_OUT_QUEUE_STOPPED = SEVT_OUT_QUEUE_STOPPED; +exports.OutQueue = OutQueue; diff --git a/core/out_queue_processor2.js b/core/out_queue_processor2.js new file mode 100644 index 0000000..193f4a6 --- /dev/null +++ b/core/out_queue_processor2.js @@ -0,0 +1,381 @@ +/* + Сервис интеграции ПП Парус 8 с WEB API + Модуль ядра: обработчик исходящего сообщения +*/ + +//---------------------- +// Подключение библиотек +//---------------------- + +require("module-alias/register"); //Поддержка псевонимов при подключении модулей +const _ = require("lodash"); //Работа с массивами и коллекциями +const lg = require("./logger"); //Протоколирование работы +const db = require("./db_connector"); //Взаимодействие с БД +const { makeModuleFullPath, validateObject } = require("./utils"); //Вспомогательные функции +const { ServerError } = require("./server_errors"); //Типовая ошибка +const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений +const prmsOutQueueProcessorSchema = require("../models/prms_out_queue_processor"); //Схема валидации параметров функций модуля +const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди +const { + SERR_UNEXPECTED, + SERR_MODULES_BAD_INTERFACE, + SERR_OBJECT_BAD_INTERFACE, + SERR_MODULES_NO_MODULE_SPECIFIED +} = require("./constants"); //Глобальные константы +const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД + +//---------- +// Константы +//---------- + +//-------------------------- +// Глобальные идентификаторы +//-------------------------- + +let dbConn = null; //Подключение к БД +let logger = null; //Протоколирование работы + +//------------ +// Тело модуля +//------------ + +//Отправка родительскому процессу ошибки обработки сообщения сервером приложений +const sendErrorResult = sMessage => { + process.send({ + sExecResult: "ERR", + sExecMsg: sMessage + }); +}; + +//Отправка родительскому процессу успеха обработки сообщения сервером приложений +const sendOKResult = () => { + process.send({ + sExecResult: "OK", + sExecMsg: null + }); +}; + +//Запись в файл !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! УБРАТЬ!!!!!!!!!!!!!!!!! +const writeToFile = queue => { + return new Promise((resolve, reject) => { + const fs = require("fs"); + fs.writeFile("c:/repos/temp/" + queue.nId, queue.blMsg, err => { + if (err) { + reject(new ServerError(SERR_UNEXPECTED, `Ошибка отработки сообщения ${prms.queue.nId}`)); + } else { + resolve(); + } + }); + }); +}; + +//Запуск обработки сообщения сервером приложений +const appProcess = async prms => { + //Обработанное сообщение + let newQueue = null; + //Обрабатываем + try { + //Фиксируем начало исполнения сервером приложений - в статусе сообщения + newQueue = await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP + }); + //Скажем что начали обработку + await logger.info( + `Обрабатываю исходящее сообщение сервером приложений: ${prms.queue.nId}, ${prms.queue.sInDate}, ${ + prms.queue.sServiceFnCode + }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, + { nQueueId: prms.queue.nId } + ); + if (prms.queue.blMsg) { + await writeToFile(prms.queue); + let sMsg = prms.queue.blMsg.toString() + " MODIFICATION FOR " + prms.queue.nId; + //Фиксируем успех исполнения + newQueue = await dbConn.setQueueAppSrvResult({ + nQueueId: prms.queue.nId, + blMsg: new Buffer(sMsg), + blResp: new Buffer("REPLAY ON " + prms.queue.nId) + }); + //Фиксируем успешное исполнение сервером приложений - в статусе сообщения + newQueue = await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK + }); + //Фиксируем успешное исполнение сервером приложений - в протоколе работы сервиса + await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, { + nQueueId: prms.queue.nId + }); + } else { + throw new ServerError( + SERR_UNEXPECTED, + `Ошибка отработки сообщения ${prms.queue.nId}: нет данных для обработки` + ); + } + } catch (e) { + //Сформируем текст ошибки + let sErr = `${SERR_UNEXPECTED}: ${e.message}`; + if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; + //Фиксируем ошибку обработки сервером приложений - в статусе сообщения + newQueue = await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: sErr, + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: + prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts + ? objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR + : objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + //Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса + await logger.error(`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ${sErr}`, { + nQueueId: prms.queue.nId + }); + } + //Возвращаем результат + return newQueue; +}; + +//Запуск обработки сообщения сервером БД +const dbProcess = async prms => { + //Проверяем структуру переданного объекта для старта + //let sCheckResult = validateObject( + // prms, + // prmsOutQueueSchema.dbProcess, + // "Параметры функции запуска обработки ообщения сервером БД" + //); + //Если структура объекта в норме + //if (!sCheckResult) { + //Обрабатываем + try { + //Фиксируем начало исполнения сервером БД - в статусе сообщения + await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB + }); + //Скажем что начали обработку + await logger.info( + `Обрабатываю исходящее сообщение сервером БД: ${prms.queue.nId}, ${prms.queue.sInDate}, ${ + prms.queue.sServiceFnCode + }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, + { nQueueId: prms.queue.nId } + ); + //Вызов обработчика БД + await dbConn.execQueueDBPrc({ nQueueId: prms.queue.nId }); + //Фиксируем успешное исполнение сервером БД - в статусе сообщения + await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK + }); + //Фиксируем успешное исполнение сервером БД - в протоколе работы сервиса + await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером БД`, { + nQueueId: prms.queue.nId + }); + } catch (e) { + //Сформируем текст ошибки + let sErr = `${SERR_UNEXPECTED}: ${e.message}`; + if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; + //Фиксируем ошибку обработки сервером БД - в статусе сообщения + await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: sErr, + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: + prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts + ? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR + : objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + //Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса + await logger.error(`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером БД: ${sErr}`, { + nQueueId: prms.queue.nId + }); + } + //} else { + // throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + //} +}; + +//Протоколирование предупреждения о ненадлежащем статусе сообщения +const warnBadStateForProcess = async prms => { + //Предупредим о неверном статусе сообщения (такие сюда попадать не должны) + await logger.warn(`Cообщение ${prms.queue.nId} в статусе ${prms.queue.sExecState} попало в очередь обработчика`, { + nQueueId: prms.queue.nId + }); +}; + +//Обработка задачи +const processTask = async prms => { + //Проверяем параметры + /* + let sCheckResult = validateObject( + prms, + prmsOutQueueProcessorSchema.processTask, + "Параметры функции обработки задачи" + ); + */ + //Если параметры в норме + //if (!sCheckResult) { + let q = null; + try { + //Создаём подключение к БД + dbConn = new db.DBConnector({ connectSettings: prms.task.connectSettings }); + //Создаём логгер для протоколирования работы + logger = new lg.Logger(); + //Подключим логгер к БД (и отключим когда надо) + dbConn.on(db.SEVT_DB_CONNECTOR_CONNECTED, connection => { + logger.setDBConnector(dbConn, true); + }); + dbConn.on(db.SEVT_DB_CONNECTOR_DISCONNECTED, () => { + logger.removeDBConnector(); + }); + //Подключаемся к БД + await dbConn.connect(); + //Считываем запись очереди + q = await dbConn.getQueue({ nQueueId: prms.task.nQueueId }); + //Далее работаем от статуса считанной записи + switch (q.nExecState) { + //Поставлено в очередь + case objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE: { + //Запускаем обработку сервером приложений + let res = await appProcess({ queue: q }); + //И если она успешно завершилась - обработку сервоером БД + if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) await dbProcess({ queue: res }); + break; + } + //Обрабатывается сервером приложений + case objQueueSchema.NQUEUE_EXEC_STATE_APP: { + //Ничего не делаем, но предупредим что так быть не должно + await warnBadStateForProcess({ queue: q }); + break; + } + //Ошибка обработки сервером приложений + case objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR: { + //Если ещё есть попытки отработки + if (q.nExecCnt < q.nRetryAttempts) { + //Снова запускаем обработку сервером приложений + let res = await appProcess({ queue: q }); + //И если она успешно завершилась - обработку сервоером БД + if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) await dbProcess({ queue: res }); + } else { + //Попыток нет - финализируем обработку + await dbConn.setQueueState({ + nQueueId: q.nId, + sExecMsg: q.sExecMsg, + nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + } + break; + } + //Успешно обработано сервером приложений + case objQueueSchema.NQUEUE_EXEC_STATE_APP_OK: { + //Запускаем обработку в БД + await dbProcess({ queue: q }); + break; + } + //Обрабатывается в БД + case objQueueSchema.NQUEUE_EXEC_STATE_DB: { + //Ничего не делаем, но предупредим что так быть не должно + await warnBadStateForProcess({ queue: q }); + break; + } + //Ошибка обработки в БД + case objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR: { + //Если ещё есть попытки отработки + if (q.nExecCnt < q.nRetryAttempts) { + //Снова запускаем обработку сервером БД + await dbProcess({ queue: q }); + } else { + //Попыток нет - финализируем обработку + await dbConn.setQueueState({ + nQueueId: q.nId, + sExecMsg: q.sExecMsg, + nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + } + break; + } + //Успешно обработано в БД + case objQueueSchema.NQUEUE_EXEC_STATE_DB_OK: { + //Финализируем + await dbConn.setQueueState({ + nQueueId: q.nId, + sExecMsg: null, + nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK + }); + break; + } + //Обработано с ошибками + case objQueueSchema.NQUEUE_EXEC_STATE_ERR: { + //Ничего не делаем, но предупредим что так быть не должно + await warnBadStateForProcess({ queue: q }); + break; + } + //Обработано успешно + case objQueueSchema.NQUEUE_EXEC_STATE_OK: { + //Ничего не делаем, но предупредим что так быть не должно + await warnBadStateForProcess({ queue: q }); + break; + } + default: { + //Ничего не делаем + break; + } + } + //Отключаемся от БД + if (dbConn) await dbConn.disconnect(); + //Отправляем успех + sendOKResult(); + } catch (e) { + //Отключаемся от БД + if (dbConn) await dbConn.disconnect(); + //Отправляем ошибку + let sErr = `${SERR_UNEXPECTED}: ${e.message}`; + if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; + sendErrorResult(sErr); + } + //Отправим родителю информацию о том, что закончили обработку + + //} else { + // sendErrorResult({ sMessage: sCheckResult }); + //} +}; + +//--------------------------------- +// Управление процессом обработчика +//--------------------------------- + +//Перехват CTRL + C (останов процесса) +process.on("SIGINT", () => {}); + +//Перехват CTRL + \ (останов процесса) +process.on("SIGQUIT", () => {}); + +//Перехват мягкого останова процесса +process.on("SIGTERM", () => {}); + +//Перехват ошибок +process.on("uncaughtException", e => { + //Отправляем ошибку родительскому процессу + sendErrorResult(e.message); +}); + +//Приём сообщений +process.on("message", task => { + //Проверяем структуру переданного сообщения + /* + let sCheckResult = validateObject( + task, + objOutQueueProcessorSchema.OutQueueProcessorTask, + "Задача обработчика очереди исходящих сообщений" + ); + */ + //Если структура объекта в норме + //if (!sCheckResult) { + //Запускаем обработку + processTask({ task }); + //} else { + // throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + //} +});