diff --git a/core/out_queue.js b/core/out_queue.js index 00bc4d5..a821c27 100644 --- a/core/out_queue.js +++ b/core/out_queue.js @@ -13,7 +13,7 @@ const ChildProcess = require("child_process"); //Работа с дочерни const { ServerError } = require("./server_errors"); //Типовая ошибка const { SERR_UNEXPECTED, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы const { validateObject } = require("./utils"); //Вспомогательные функции -const { NINC_EXEC_CNT_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД +const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди const prmsOutQueueSchema = require("../models/prms_out_queue"); //Схемы валидации параметров функций класса @@ -76,13 +76,118 @@ class OutQueue extends EventEmitter { //оповестим подписчиков о появлении нового отчета this.emit(SEVT_OUT_QUEUE_STOPPED); } - //Останов обработчика сообщения - stopMessageWorker(worker) { - worker.kill(); - this.nWorkersLeft++; + //Установка финальных статусов сообщения в БД + async finalise(prms) { + //Проверяем структуру переданного объекта для старта + let sCheckResult = validateObject( + prms, + prmsOutQueueSchema.finalise, + "Параметры функции установки финальных статусов сообщения в БД" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Если больше нет попыток исполнения и сообщение не в статусе успешной обработки сервером БД + if ( + prms.queue.nExecState != objQueueSchema.NQUEUE_EXEC_STATE_DB_OK && + prms.queue.nExecCnt >= prms.queue.nRetryAttempts + ) { + //То считаем, что оно выполнено с ошибками и больше пытаться не надо + await this.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: prms.queue.sExecMsg, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + } else { + //Если сообщение успешно исполнено сервером БД - то значит оно успешно исполнено вообще + if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_DB_OK) { + await this.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK + }); + } else { + //Если сообщение в статусе исполнения сервером приложений (чего здесь быть не может) - это ошибка + if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP) { + //То выставим ему ошибку исполнения сервером приложений + await this.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: prms.queue.sExecMsg, + nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR + }); + } else { + //Если сообщение в статусе исполнения сервером БД (чего здесь быть не может) - это ошибка + if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_DB) { + //То выставим ему ошибку исполнения сервером БД + await this.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: prms.queue.sExecMsg, + nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR + }); + } + } + } + } + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } + //Запуск обработки сообщения сервером БД + async dbProcess(prms) { + //Проверяем структуру переданного объекта для старта + let sCheckResult = validateObject( + prms, + prmsOutQueueSchema.dbProcess, + "Параметры функции запуска обработки ообщения сервером БД" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Буфер для текущего состояния сообщения + let curQueue = null; + //Обрабатываем + try { + //Фиксируем начало исполнения сервером БД - в статусе сообщения + curQueue = await this.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB + }); + //Вызов обработчика БД + curQueue = await this.dbConn.execQueueDBPrc({ nQueueId: prms.queue.nId }); + //Фиксируем успешное исполнение сервером БД - в статусе сообщения + curQueue = await this.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK + }); + //Фиксируем успешное исполнение сервером БД - в протоколе работы сервиса + await this.logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером БД`, { + nQueueId: prms.queue.nId + }); + } catch (e) { + //Сформируем текст ошибки + let sErr = `${SERR_UNEXPECTED}: ${e.message}`; + if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; + //Фиксируем ошибку обработки сервером БД - в статусе сообщения + curQueue = await this.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: sErr, + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR + }); + //Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса + await this.logger.error( + `Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером БД: ${sErr}`, + { nQueueId: prms.queue.nId } + ); + } + //Вернём текущее состоянии сообщения очереди + return curQueue; + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } } //Запуск обработки очередного сообщения - processMessage(prms) { + async processMessage(prms) { //Проверяем структуру переданного объекта для старта let sCheckResult = validateObject( prms, @@ -97,6 +202,32 @@ class OutQueue extends EventEmitter { const self = this; //Создаём новый обработчик сообщений const proc = ChildProcess.fork("core/out_queue_processor", { silent: false }); + //Текущее состояние сообщения + let curQueue = null; + //Скажем что начали обработку + await self.logger.info( + `Обрабатываю исходящее сообщение: ${prms.queue.nId}, ${prms.queue.sInDate}, ${ + prms.queue.sServiceFnCode + }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, + { nQueueId: prms.queue.nId } + ); + //Установим его статус в БД - обрабатывается сервером приложений (только для новых или повторно обрабатываемых сервером приложений) + if ( + prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE || + prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR + ) { + curQueue = await self.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP + }); + } + //Установим его статус в БД - обрабатывается в БД (только если сюда пришло сообщение на повторную обработку сервером БД) + if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR) { + curQueue = await self.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB + }); + } //Перехват сообщений обработчика proc.on("message", async result => { //Проверяем структуру полученного сообщения @@ -107,100 +238,179 @@ class OutQueue extends EventEmitter { ); //Если структура сообщения в норме if (!sCheckResult) { - //Если обработчик вернул ошибку - if (result.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR) { - //Установим ошибочный статус в БД для сообщений и увеличим счетчик попыток отправки - await self.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: result.sExecMsg, - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: result.nExecState - }); - //Фиксируем ошибку в протоколе работы сервиса - await self.logger.error( - `Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ` + - result.sExecMsg, - { - nQueueId: prms.queue.nId + //Движение события по статусам в зависимости от того в каком состоянии его вернул обработчик + try { + //Работаем от статуса сообщения полученного от обработчика + switch (result.nExecState) { + //Ошибка обработки + case objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR: { + //Установим ошибочный статус в БД для сообщений и увеличим счетчик попыток отправки + curQueue = await self.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: result.sExecMsg, + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: result.nExecState + }); + //Фиксируем ошибку в протоколе работы сервиса + await self.logger.error( + `Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ${ + result.sExecMsg + }`, + { nQueueId: prms.queue.nId } + ); + break; } - ); - } else { - //Пишем в базу успех - await self.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: result.sExecMsg, - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: result.nExecState - }); - //Фиксируем успех в протоколе работы сервиса - await self.logger.info( - `Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, - { - nQueueId: prms.queue.nId + //Успех обработки + case objQueueSchema.NQUEUE_EXEC_STATE_APP_OK: { + //Если состояние менялось (а не просто повторная отработка) + if (result.nExecState != prms.queue.nExecState) { + //Пишем в базу успех отработки сервером приложений - результаты обработки + curQueue = await self.dbConn.setQueueAppSrvResult({ + nQueueId: prms.queue.nId, + blMsg: result.blMsg ? new Buffer(result.blMsg) : null, + blResp: result.blResp ? new Buffer(result.blResp) : null + }); + //Пишем в базу успех отработки сервером приложений - статус сообщения + curQueue = await self.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nExecState: result.nExecState + }); + //Пишем в базу успех отработки сервером приложений - запись в протокол работы сервера приложений + await self.logger.info( + `Исходящее сообщение ${ + prms.queue.nId + } успешно отработано сервером приложений`, + { nQueueId: prms.queue.nId } + ); + } + //Запускаем обработку сервером БД + curQueue = await self.dbProcess(prms); + break; } - ); + //Обработчик ничего не делал + default: { + //Обработчик ничего не делал, но если сообщение сообщение в статусе - ошибка обработки сервером БД или обрабатывается сервером БД, то запустим обработчик БД + if (result.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR) { + //Запускаем обработчик сервера БД + curQueue = await self.dbProcess(prms); + } else { + //Во всех остальных случаях - ничего не делаем вообще + curQueue = prms.queue; + } + break; + } + } + } catch (e) { + //Сформируем текст ошибки + let sErr = `${SERR_UNEXPECTED}: ${e.message}`; + if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; + //Фиксируем ошибку обработки сервером приложений - статус сообщения + curQueue = await self.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: sErr, + nIncExecCnt: NINC_EXEC_CNT_YES + }); + //Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений + await self.logger.error(sErr, { nQueueId: prms.queue.nId }); } } else { - //Пришел неожиданный ответ обработчика, установим статус в БД - ошибка обработки сервером приложений - await self.dbConn.setQueueState({ + //Пришел неожиданный ответ обработчика - статус сообщения + curQueue = await self.dbConn.setQueueState({ nQueueId: prms.queue.nId, sExecMsg: sCheckResult, - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR + nIncExecCnt: NINC_EXEC_CNT_YES }); - //Фиксируем ошибку в протоколе работы сервиса + //Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений await self.logger.error( `Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`, - { - nQueueId: prms.queue.nId - } + { nQueueId: prms.queue.nId } ); } - self.stopMessageWorker(proc); + //Выставляем финальные статусы + try { + await self.finalise({ queue: curQueue }); + } catch (e) { + //Сформируем текст ошибки + let sErr = `${SERR_UNEXPECTED}: ${e.message}`; + if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; + //Установим его статус в БД - ошибка установки финального статуса + await self.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: sErr, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + //Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений + await self.logger.error(`Фатальная ошибка обработчика сообщения ${prms.queue.nId}: ${sErr}`, { + nQueueId: prms.queue.nId + }); + } + //Останавливаем обработчик и инкрементируем флаг их доступного количества + proc.kill(); + this.nWorkersLeft++; }); //Перехват ошибок обработчика proc.on("error", async e => { + //Сформируем текст ошибки + let sErr = `${SERR_UNEXPECTED}: ${e.message}`; + if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; //Установим его статус в БД - ошибка обработки сервером приложений - await self.dbConn.setQueueState({ + let curQueue = await self.dbConn.setQueueState({ nQueueId: prms.queue.nId, - sExecMsg: e.message, - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR + sExecMsg: sErr, + nIncExecCnt: NINC_EXEC_CNT_YES }); //Так же фиксируем ошибку в протоколе работы - await self.logger.error(`Ошибка обработки исходящего сообщения сервером приложений: ${e.message}`, { + await self.logger.error(`Ошибка обработки исходящего сообщения сервером приложений: ${sErr}`, { nQueueId: prms.queue.nId }); - //Завершим обработчик - self.stopMessageWorker(proc); + //Выставляем финальные статусы + try { + await self.finalise({ queue: curQueue }); + } catch (e) { + //Сформируем текст ошибки + let sErr = `${SERR_UNEXPECTED}: ${e.message}`; + if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; + //Установим его статус в БД - ошибка установки финального статуса + await self.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: sErr, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + //Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений + await self.logger.error(`Фатальная ошибка обработчика сообщения ${prms.queue.nId}: ${sErr}`, { + nQueueId: prms.queue.nId + }); + } + //Останавливаем обработчик и инкрементируем флаг их доступного количества + proc.kill(); + this.nWorkersLeft++; }); //Перехват останова обработчика proc.on("exit", code => {}); //Запускаем обработчик proc.send({ nQueueId: prms.queue.nId, - service: {}, - function: {}, - blMsg: prms.queue.blMsg + nExecState: prms.queue.nExecState, + blMsg: prms.queue.blMsg, + blResp: prms.queue.blResp, + service: _.find(this.services, { nId: prms.queue.nServiceId }), + function: _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, { + nId: prms.queue.nServiceFnId + }) }); //Уменьшаем количество доступных обработчиков this.nWorkersLeft--; - //Вернем признак того, что сообщение обрабатывается - return true; - } else { - //Вернем признак того, что сообщение не обрабатывается - return false; } } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } //Перезапуск опроса очереди исходящих сообщений - restartDetectingLoop() { + async restartDetectingLoop() { //Включаем опрос очереди только если установлен флаг работы if (this.bWorking) { - this.nDetectingLoopTimeOut = setTimeout(() => { - this.outDetectingLoop(); + this.nDetectingLoopTimeOut = await setTimeout(async () => { + await this.outDetectingLoop(); }, this.outGoing.nCheckTimeout); } } @@ -214,44 +424,58 @@ class OutQueue extends EventEmitter { let outMsgs = await this.dbConn.getOutgoing({ nPortionSize: this.nWorkersLeft }); //Если есть сообщения if (Array.isArray(outMsgs) && outMsgs.length > 0) { - //Ставим их в очередь + //Обходим их for (let i = 0; i < outMsgs.length; i++) { - //Если сообщение успешно взято в обработку - if (this.processMessage({ queue: outMsgs[i] })) { - //Скажем что оно у нас есть - await this.logger.info( - "Исполняю отправку исходящего сообщения: " + - outMsgs[i].nId + - ", " + - outMsgs[i].sInDate + - ", " + - outMsgs[i].sServiceFnCode + - ", " + - outMsgs[i].sExecState + - ", попытка исполнения - " + - (outMsgs[i].nExecCnt + 1), - { nQueueId: outMsgs[i].nId } - ); - //Установим его статус в БД - обрабатывается сервером приложений - await this.dbConn.setQueueState({ + //И запускаем обработчики + try { + await this.processMessage({ queue: outMsgs[i] }); + } catch (e) { + //Какие непредвиденные ошибки при обработке текущего сообщения - подготовим текст ошибки + let sErr = `${SERR_UNEXPECTED}: ${e.message}`; + if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; + //Фиксируем ошибку обработки сервером приложений - статус сообщения (сам статус - не меняем, здесь только фатальные ошибки, но делаем инкремент количества попыток) + let curQueue = await this.dbConn.setQueueState({ nQueueId: outMsgs[i].nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP + sExecMsg: sErr, + nIncExecCnt: NINC_EXEC_CNT_YES }); + //Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений + await this.logger.error(sErr, { nQueueId: outMsgs[i].nId }); + //Выставляем финальные статусы + try { + await this.finalise({ queue: curQueue }); + } catch (e) { + //Сформируем текст ошибки + let sErr = `${SERR_UNEXPECTED}: ${e.message}`; + if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; + //Установим его статус в БД - ошибка установки финального статуса + await self.dbConn.setQueueState({ + nQueueId: outMsgs[i].nId, + sExecMsg: sErr, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + //Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений + await self.logger.error( + `Фатальная ошибка обработчика сообщения ${outMsgs[i].nId}: ${sErr}`, + { nQueueId: outMsgs[i].nId } + ); + } } } - } else { - await this.logger.info("Нет новых сообщений"); } - this.restartDetectingLoop(); + //Запустили отработку всех считанных - перезапускаем цикл опроса исходящих сообщений + await this.restartDetectingLoop(); } catch (e) { - if (e instanceof ServerError) - await this.logger.error("При обработке исходящего сообщения: " + e.sCode + ": " + e.sMessage); - else this.logger.error(SERR_UNEXPECTED + ": " + e.message); - this.restartDetectingLoop(); + //Какие непредвиденные ошибки при получении списка сообщений - подготовим текст ошибки + let sErr = `${SERR_UNEXPECTED}: ${e.message}`; + if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; + //Фиксируем ошибку в протоколе работы сервера приложений + await this.logger.error(sErr); + await this.restartDetectingLoop(); } } else { - await this.logger.info("Нет свободных обработчиков"); - this.restartDetectingLoop(); + //Нет свободных обработчиков - ждём и перезапускаем цикл опроса + await this.restartDetectingLoop(); } } //Запуск обработки очереди исходящих сообщений