diff --git a/core/constants.js b/core/constants.js index 05199a0..d7b0713 100644 --- a/core/constants.js +++ b/core/constants.js @@ -15,6 +15,7 @@ exports.SMODULES_PATH_MODELS = "@models"; //Модели данных и схе //Типовые коды ошибок exports.SERR_COMMON = "ERR_COMMON"; //Общая ошибка exports.SERR_UNEXPECTED = "ERR_UNEXPECTED"; //Неожиданная ошибка +exports.SERR_UNAUTH = "ERR_UNAUTH"; //Отсутствие аутентификации //Типовые коды ошибок подключения модулей exports.SERR_MODULES_NO_MODULE_SPECIFIED = "ERR_MODULES_NO_MODULE_SPECIFIED"; //Не указан подключаемый модуль diff --git a/core/out_queue.js b/core/out_queue.js index 6c73b99..f36810f 100644 --- a/core/out_queue.js +++ b/core/out_queue.js @@ -195,6 +195,9 @@ class OutQueue extends EventEmitter { const self = this; //Запоминаем текущее количество попыток обработки const nQueueOldExecCnt = prms.queue.nExecCnt; + //Буфер для ошибок (для журнала работы и очереди обмена) + let sErrorLog = null; + let sError = null; //Создаём новый обработчик сообщений const proc = ChildProcess.fork("core/out_queue_processor", { silent: false }); //Перехват сообщений обработчика @@ -211,40 +214,30 @@ class OutQueue extends EventEmitter { if (!sCheckResult) { //Анализируем результат обработки - если ошибка - фиксируем if (result.sResult == objOutQueueProcessorSchema.STASK_RESULT_ERR) { - //Фиксируем ошибку обработки - протокол работы сервиса - await self.logger.error(`Ошибка обработки исходящего сообщения: ${result.sMsg}`, { - nQueueId: prms.queue.nId - }); - //Фиксируем ошибку обработки - статус сообщения - await this.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: result.sMsg, - nIncExecCnt: - nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, - nExecState: - (nQueueOldExecCnt == prms.queue.nExecCnt - ? prms.queue.nExecCnt + 1 - : prms.queue.nExecCnt) < prms.queue.nRetryAttempts - ? prms.queue.nExecState - : objQueueSchema.NQUEUE_EXEC_STATE_ERR - }); + //Запоминаем ошибку обработчика + sErrorLog = `Ошибка обработки исходящего сообщения: ${result.sMsg}`; + sError = result.sMsg; } else { - //Ошибки нет, но если есть контекст для сервиса - сохраним его для дальнейшего использования - if (!_.isUndefined(result.context)) { - let tmpSrv = _.find(this.services, { nId: prms.queue.nServiceId }); - tmpSrv.context = _.cloneDeep(result.context); + //Ошибки обработки нет, но может быть есть ошибка аутентификации + if (result.sResult == objOutQueueProcessorSchema.STASK_RESULT_UNAUTH) { + //!!!!!!!!!!!!!! + //?????????????? + //!!!!!!!!!!!!!! } } } else { - //Пришел неожиданный ответ обработчика - запись в протокол работы сервиса - await self.logger.error( - `Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`, - { nQueueId: prms.queue.nId } - ); - //Фиксируем ошибку обработки - статус сообщения + //Пришел неожиданный ответ обработчика + sErrorLog = `Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`; + sError = sCheckResult; + } + //Фиксируем ошибки, если есть + if (sError) { + //Запись в протокол работы сервиса + await self.logger.error(sErrorLog, { nQueueId: prms.queue.nId }); + //Запись в статус сообщения await this.dbConn.setQueueState({ nQueueId: prms.queue.nId, - sExecMsg: `Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`, + sExecMsg: sError, nIncExecCnt: nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, nExecState: (nQueueOldExecCnt == prms.queue.nExecCnt diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index ba6d41c..dc5a2a5 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -17,8 +17,14 @@ const { ServerError } = require("./server_errors"); //Типовая ошибк const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений const prmsOutQueueProcessorSchema = require("../models/prms_out_queue_processor"); //Схема валидации параметров функций модуля const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди +const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервиса const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса -const { SERR_OBJECT_BAD_INTERFACE, SERR_APP_SERVER_BEFORE, SERR_APP_SERVER_AFTER } = require("./constants"); //Глобальные константы +const { + SERR_OBJECT_BAD_INTERFACE, + SERR_APP_SERVER_BEFORE, + SERR_APP_SERVER_AFTER, + SERR_UNAUTH +} = require("./constants"); //Глобальные константы const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД //-------------------------- @@ -44,40 +50,36 @@ const sendErrorResult = prms => { if (!sCheckResult) { process.send({ sResult: objOutQueueProcessorSchema.STASK_RESULT_ERR, - sMsg: prms.sMessage, - context: null + sMsg: prms.sMessage }); } else { process.send({ sResult: objOutQueueProcessorSchema.STASK_RESULT_ERR, - sMsg: sCheckResult, - context: null + sMsg: sCheckResult }); } }; //Отправка родительскому процессу успеха обработки сообщения сервером приложений -const sendOKResult = prms => { - //Проверяем структуру переданного сообщения - let sCheckResult = validateObject( - prms, - prmsOutQueueProcessorSchema.sendOKResult, - "Параметры функции отправки родительскому процессу успеха обработки сообщения" - ); - //Если структура объекта в норме - if (!sCheckResult) { - process.send({ - sResult: objOutQueueProcessorSchema.STASK_RESULT_OK, - sMsg: null, - context: prms.context - }); - } else { - sendErrorResult({ sMessage: sCheckResult }); - } +const sendOKResult = () => { + process.send({ + sResult: objOutQueueProcessorSchema.STASK_RESULT_OK, + sMsg: null + }); +}; + +//Отправка родительскому процессу успеха обработки сообщения сервером приложений +const sendUnAuthResult = () => { + process.send({ + sResult: objOutQueueProcessorSchema.STASK_RESULT_UNAUTH, + sMsg: null + }); }; //Запуск обработки сообщения сервером приложений const appProcess = async prms => { + //Результат обработки - объект Queue (обработанное сообщение) или ServerError (ошибка обработки) + let res = null; //Проверяем структуру переданного объекта для старта let sCheckResult = validateObject( prms, @@ -86,145 +88,157 @@ const appProcess = async prms => { ); //Если структура объекта в норме if (!sCheckResult) { - //Обработанное сообщение - let newQueue = null; //Обрабатываем try { - //Фиксируем начало исполнения сервером приложений - в статусе сообщения - newQueue = await dbConn.setQueueState({ - nQueueId: prms.queue.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP - }); - //Скажем что начали обработку - await logger.info( - `Обрабатываю исходящее сообщение сервером приложений: ${prms.queue.nId}, ${prms.queue.sInDate}, ${ - prms.queue.sServiceFnCode - }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, - { nQueueId: prms.queue.nId } - ); - //Считаем тело сообщения - let qData = await dbConn.getQueueMsg({ nQueueId: prms.queue.nId }); - //Кладём данные тела в объект сообщения и инициализируем поле для ответа - _.extend(prms.queue, { blMsg: qData.blMsg, blResp: null }); - //Собираем параметры для передачи серверу - let options = { method: prms.service.sFnPrmsType }; - //Определимся с URL и телом сообщения в зависимости от способа передачи параметров - if (prms.service.sFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) { - options.url = buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL }); - options.body = prms.queue.blMsg; - } else { - options.url = buildURL({ - sSrvRoot: prms.service.sSrvRoot, - sFnURL: prms.function.sFnURL, - sQuery: prms.queue.blMsg.toString() + //Считываем статус аутентификации сервиса + let isServiceAuth = await dbConn.isServiceAuth({ nServiceId: prms.service.nId }); + //Проверяем аутентификацию + if ( + prms.function.nAuthOnly == objServiceFnSchema.NAUTH_ONLY_NO || + (prms.function.nAuthOnly == objServiceFnSchema.NAUTH_ONLY_YES && + isServiceAuth == objServiceSchema.NIS_AUTH_YES) + ) { + //Фиксируем начало исполнения сервером приложений - в статусе сообщения + res = await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP }); - } - //Выполняем обработчик "До" (если он есть) - if (prms.function.sAppSrvBefore) { - const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore); - let resBefore = null; - try { - let resBeforePrms = _.cloneDeep(prms); - resBefore = await fnBefore(resBeforePrms); - } catch (e) { - throw new ServerError(SERR_APP_SERVER_BEFORE, e.message); + //Фиксируем начало исполнения сервером приложений - в протоколе работы сервиса + await logger.info( + `Обрабатываю исходящее сообщение сервером приложений: ${prms.queue.nId}, ${prms.queue.sInDate}, ${ + prms.queue.sServiceFnCode + }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, + { nQueueId: prms.queue.nId } + ); + //Считаем тело сообщения + let qData = await dbConn.getQueueMsg({ nQueueId: prms.queue.nId }); + //Считаем контекст сервиса + let serviceCtx = await dbConn.getServiceContext({ nServiceId: prms.service.nId }); + //Кладём данные тела в объект сообщения и инициализируем поле для ответа + _.extend(prms.queue, { blMsg: qData.blMsg, blResp: null }); + //Кладём данные контекста в сервис + _.extend(prms.service, serviceCtx); + //Собираем параметры для передачи серверу + let options = { method: prms.service.sFnPrmsType }; + //Определимся с URL и телом сообщения в зависимости от способа передачи параметров + if (prms.service.sFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) { + options.url = buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL }); + options.body = prms.queue.blMsg; + } else { + options.url = buildURL({ + sSrvRoot: prms.service.sSrvRoot, + sFnURL: prms.function.sFnURL, + sQuery: prms.queue.blMsg.toString() + }); } - //Проверяем структуру ответа функции предобработки - if (resBefore) { - let sCheckResult = validateObject( - resBefore, - objOutQueueProcessorSchema.OutQueueProcessorFnBefore, - "Результат функции предобработки исходящего сообщения" - ); - //Если структура ответа в норме - if (!sCheckResult) { - //Применим её - if (!_.isUndefined(resBefore.options)) options = _.cloneDeep(resBefore.options); - if (!_.isUndefined(resBefore.blMsg)) { - prms.queue.blMsg = resBefore.blMsg; - await dbConn.setQueueMsg({ - nQueueId: prms.queue.nId, - blMsg: prms.queue.blMsg - }); - if (prms.service.sFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) { - options.body = prms.queue.blMsg; - } else { - options.url = buildURL({ - sSrvRoot: prms.service.sSrvRoot, - sFnURL: prms.function.sFnURL, - sQuery: prms.queue.blMsg.toString() + //Выполняем обработчик "До" (если он есть) + if (prms.function.sAppSrvBefore) { + const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore); + let resBefore = null; + try { + let resBeforePrms = _.cloneDeep(prms); + resBefore = await fnBefore(resBeforePrms); + } catch (e) { + throw new ServerError(SERR_APP_SERVER_BEFORE, e.message); + } + //Проверяем структуру ответа функции предобработки + if (resBefore) { + let sCheckResult = validateObject( + resBefore, + objOutQueueProcessorSchema.OutQueueProcessorFnBefore, + "Результат функции предобработки исходящего сообщения" + ); + //Если структура ответа в норме + if (!sCheckResult) { + //Применим её + if (!_.isUndefined(resBefore.blMsg)) { + prms.queue.blMsg = resBefore.blMsg; + await dbConn.setQueueMsg({ + nQueueId: prms.queue.nId, + blMsg: prms.queue.blMsg + }); + if (prms.service.sFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) { + options.body = prms.queue.blMsg; + } else { + options.url = buildURL({ + sSrvRoot: prms.service.sSrvRoot, + sFnURL: prms.function.sFnURL, + sQuery: prms.queue.blMsg.toString() + }); + } + } + if (!_.isUndefined(resBefore.options)) options = _.cloneDeep(resBefore.options); + if (!_.isUndefined(resBefore.bUnAuth)) + throw new ServerError(SERR_UNAUTH, "Не аутентифицирован"); + } else { + //Или расскажем об ошибке в структуре ответа + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } + } + //Фиксируем отправку сообщения в протоколе работы сервиса + await logger.info(`Отправляю исходящее сообщение ${prms.queue.nId} на URL: ${options.url}`, { + nQueueId: prms.queue.nId + }); + //Отправляем сообщение удалённому серверу + let serverResp = await rqp(options); + //Сохраняем полученный ответ + prms.queue.blResp = new Buffer(serverResp); + await dbConn.setQueueResp({ + nQueueId: prms.queue.nId, + blResp: prms.queue.blResp + }); + //Выполняем обработчик "После" (если он есть) + if (prms.function.sAppSrvAfter) { + const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter); + let resAfter = null; + try { + let resAfterPrms = _.cloneDeep(prms); + resAfter = await fnAfter(resAfterPrms); + } catch (e) { + throw new ServerError(SERR_APP_SERVER_AFTER, e.message); + } + //Проверяем структуру ответа функции постобработки + if (resAfter) { + let sCheckResult = validateObject( + resAfter, + objOutQueueProcessorSchema.OutQueueProcessorFnAfter, + "Результат функции постобработки исходящего сообщения" + ); + //Если структура ответа в норме + if (!sCheckResult) { + //Применим её + if (!_.isUndefined(resAfter.blResp)) { + prms.queue.blResp = resAfter.blResp; + await dbConn.setQueueResp({ + nQueueId: prms.queue.nId, + blResp: prms.queue.blResp }); } + if (!_.isUndefined(resAfter.bUnAuth)) + throw new ServerError(SERR_UNAUTH, "Не аутентифицирован"); + } else { + //Или расскажем об ошибке в структуре ответа + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } - if (!_.isUndefined(resBefore.context)) prms.service.context = _.cloneDeep(resBefore.context); - } else { - //Или расскажем об ошибке - throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } - } - //Фиксируем отправку сообщения в протоколе работы сервиса - await logger.info(`Отправляю исходящее сообщение ${prms.queue.nId} на URL: ${options.url}`, { - nQueueId: prms.queue.nId - }); - //Отправляем сообщение удалённому серверу - let serverResp = await rqp(options); - //Сохраняем полученный ответ - _.extend(prms, { serverResp }); - await dbConn.setQueueResp({ - nQueueId: prms.queue.nId, - blResp: new Buffer(prms.serverResp) - }); - //Выполняем обработчик "После" (если он есть) - if (prms.function.sAppSrvAfter) { - const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter); - let resAfter = null; - try { - let resAfterPrms = _.cloneDeep(prms); - resAfter = await fnAfter(resAfterPrms); - } catch (e) { - throw new ServerError(SERR_APP_SERVER_AFTER, e.message); - } - //Проверяем структуру ответа функции постобработки - if (resAfter) { - let sCheckResult = validateObject( - resAfter, - objOutQueueProcessorSchema.OutQueueProcessorFnAfter, - "Результат функции постобработки исходящего сообщения" - ); - //Если структура ответа в норме - if (!sCheckResult) { - //Применим её - if (!_.isUndefined(resAfter.blResp)) prms.queue.blResp = resAfter.blResp; - if (!_.isUndefined(resAfter.context)) prms.service.context = _.cloneDeep(resAfter.context); - } else { - //Или расскажем об ошибке - throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); - } - } else { - prms.queue.blResp = new Buffer(serverResp.toString()); - } + //Фиксируем успешное исполнение сервером приложений - в статусе сообщения + res = await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK + }); + //Фиксируем успешное исполнение сервером приложений - в протоколе работы сервиса + await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, { + nQueueId: prms.queue.nId + }); } else { - prms.queue.blResp = new Buffer(serverResp.toString()); + //Нет атуентификации (мы ещё не меняли статус сообщения и это не считается за попытку исполнения, это будет просто сигнал, что надо аутентифицироваться а потом задача снова попадёт в очередь) + res = new ServerError(SERR_UNAUTH, "Не аутентифицирован"); } - //Фиксируем успех исполнения - newQueue = await dbConn.setQueueAppSrvResult({ - nQueueId: prms.queue.nId, - blMsg: prms.queue.blMsg, - blResp: prms.queue.blResp - }); - //Фиксируем успешное исполнение сервером приложений - в статусе сообщения - newQueue = await dbConn.setQueueState({ - nQueueId: prms.queue.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK - }); - //Фиксируем успешное исполнение сервером приложений - в протоколе работы сервиса - await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, { - nQueueId: prms.queue.nId - }); } catch (e) { //Фиксируем ошибку обработки сервером приложений - в статусе сообщения - newQueue = await dbConn.setQueueState({ + res = await dbConn.setQueueState({ nQueueId: prms.queue.nId, sExecMsg: makeErrorText(e), nIncExecCnt: NINC_EXEC_CNT_YES, @@ -239,15 +253,18 @@ const appProcess = async prms => { { nQueueId: prms.queue.nId } ); } - //Возвращаем результат - return newQueue; } else { - throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + //Фатальная ошибка обработки - некорректный объект параметров + res = new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } + //Возвращаем результат + return res; }; //Запуск обработки сообщения сервером БД const dbProcess = async prms => { + //Результат обработки - объект Queue (обработанное сообщение) или ServerError (ошибка обработки) + let res = null; //Проверяем структуру переданного объекта для старта let sCheckResult = validateObject( prms, @@ -259,11 +276,11 @@ const dbProcess = async prms => { //Обрабатываем try { //Фиксируем начало исполнения сервером БД - в статусе сообщения - await dbConn.setQueueState({ + res = await dbConn.setQueueState({ nQueueId: prms.queue.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB }); - //Скажем что начали обработку + //Фиксируем начало исполнения сервером БД - в протоколе работы сервиса await logger.info( `Обрабатываю исходящее сообщение сервером БД: ${prms.queue.nId}, ${prms.queue.sInDate}, ${ prms.queue.sServiceFnCode @@ -271,9 +288,9 @@ const dbProcess = async prms => { { nQueueId: prms.queue.nId } ); //Вызов обработчика БД - await dbConn.execQueueDBPrc({ nQueueId: prms.queue.nId }); - //Фиксируем успешное исполнение сервером БД - в статусе сообщения - await dbConn.setQueueState({ + res = await dbConn.execQueueDBPrc({ nQueueId: prms.queue.nId }); + //Фиксируем успешное исполнение (полное - дальше обработки нет) - в статусе сообщения + res = await dbConn.setQueueState({ nQueueId: prms.queue.nId, nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK @@ -300,8 +317,11 @@ const dbProcess = async prms => { ); } } else { - throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + //Фатальная ошибка обработки - некорректный объект параметров + res = new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } + //Возвращаем результат + return res; }; //Обработка задачи @@ -331,101 +351,50 @@ const processTask = async prms => { await dbConn.connect(); //Считываем запись очереди q = await dbConn.getQueue({ nQueueId: prms.task.nQueueId }); - //!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Проверяем аутентификацию - //if(prms.task.function.) + //Выставим флаг - нет ошибок аутентификации на удаленном сервере + let bUnAuthFlag = false; //Далее работаем от статуса считанной записи switch (q.nExecState) { - //Поставлено в очередь - case objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE: { - //Запускаем обработку сервером приложений - try { - let res = await appProcess({ - queue: q, - service: prms.task.service, - function: prms.task.function - }); - //И если она успешно завершилась - обработку сервером БД - if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { - try { - await dbProcess({ queue: res }); - } catch (e) { - //Фиксируем ошибку обработки сервером БД - в статусе сообщения - await dbConn.setQueueState({ - nQueueId: res.nId, - sExecMsg: makeErrorText(e), - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: - res.nExecCnt + 1 < res.nRetryAttempts - ? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR - : objQueueSchema.NQUEUE_EXEC_STATE_ERR - }); - //Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса - await logger.error( - `Ошибка обработки исходящего сообщения ${res.nId} сервером БД: ${makeErrorText(e)}`, - { nQueueId: res.nId } - ); - } - } - } catch (e) { - //Фиксируем ошибку обработки сервером приложений - в статусе сообщения - newQueue = await dbConn.setQueueState({ - nQueueId: q.nId, - sExecMsg: makeErrorText(e), - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: - q.nExecCnt + 1 < q.nRetryAttempts - ? objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR - : objQueueSchema.NQUEUE_EXEC_STATE_ERR - }); - //Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса - await logger.error( - `Ошибка обработки исходящего сообщения ${q.nId} сервером приложений: ${makeErrorText(e)}`, - { nQueueId: q.nId } - ); - } - break; - } - //Обрабатывается сервером приложений - case objQueueSchema.NQUEUE_EXEC_STATE_APP: { - //Предупредим о неверном статусе сообщения (такие сюда попадать не должны) - await logger.warn(`Cообщение ${q.nId} в статусе ${q.sExecState} попало в очередь обработчика`, { - nQueueId: q.nId - }); - break; - } - //Ошибка обработки сервером приложений + //Статусы "Поставлено в очередь" или "Ошибка обработки сервером приложений" + case objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE: case objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR: { - //Если ещё есть попытки отработки - if (q.nExecCnt < q.nRetryAttempts) { - //Снова запускаем обработку сервером приложений + //Если ещё не обрабатывали или есть ещё попытки отработки + if (q.nExecCnt == 0 || q.nExecCnt < q.nRetryAttempts) { + //Запускаем обработку сервером приложений try { let res = await appProcess({ queue: q, service: prms.task.service, function: prms.task.function }); - //И если она успешно завершилась - обработку сервоером БД - if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { - try { - await dbProcess({ queue: res }); - } catch (e) { - //Фиксируем ошибку обработки сервером БД - в статусе сообщения - await dbConn.setQueueState({ - nQueueId: res.nId, - sExecMsg: makeErrorText(e), - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: - res.nExecCnt + 1 < res.nRetryAttempts - ? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR - : objQueueSchema.NQUEUE_EXEC_STATE_ERR - }); - //Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса - await logger.error( - `Ошибка обработки исходящего сообщения ${res.nId} сервером БД: ${makeErrorText( - e - )}`, - { nQueueId: res.nId } - ); + //Если результат обработки - ошибка аутентификации + if (res === objOutQueueProcessorSchema.STASK_RESULT_UNAUTH) { + //Выставим флаг, который будет указывать на ошибку аутентификации + bUnAuthFlag = true; + } else { + //Нет такой ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД + if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { + try { + await dbProcess({ queue: res }); + } catch (e) { + //Фиксируем ошибку обработки сервером БД - в статусе сообщения + await dbConn.setQueueState({ + nQueueId: res.nId, + sExecMsg: makeErrorText(e), + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: + res.nExecCnt + 1 < res.nRetryAttempts + ? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR + : objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + //Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса + await logger.error( + `Ошибка обработки исходящего сообщения ${ + res.nId + } сервером БД: ${makeErrorText(e)}`, + { nQueueId: res.nId } + ); + } } } } catch (e) { @@ -458,42 +427,11 @@ const processTask = async prms => { } break; } - //Успешно обработано сервером приложений - case objQueueSchema.NQUEUE_EXEC_STATE_APP_OK: { - //Запускаем обработку в БД - try { - await dbProcess({ queue: q }); - } catch (e) { - //Фиксируем ошибку обработки сервером БД - в статусе сообщения - await dbConn.setQueueState({ - nQueueId: q.nId, - sExecMsg: makeErrorText(e), - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: - q.nExecCnt + 1 < q.nRetryAttempts - ? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR - : objQueueSchema.NQUEUE_EXEC_STATE_ERR - }); - //Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса - await logger.error( - `Ошибка обработки исходящего сообщения ${q.nId} сервером БД: ${makeErrorText(e)}`, - { nQueueId: q.nId } - ); - } - break; - } - //Обрабатывается в БД - case objQueueSchema.NQUEUE_EXEC_STATE_DB: { - //Предупредим о неверном статусе сообщения (такие сюда попадать не должны) - await logger.warn(`Cообщение ${q.nId} в статусе ${q.sExecState} попало в очередь обработчика`, { - nQueueId: q.nId - }); - break; - } - //Ошибка обработки в БД + //Статусы "Успешно обработано сервером приложений" и "Ошибка обработки в БД" + case objQueueSchema.NQUEUE_EXEC_STATE_APP_OK: case objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR: { //Если ещё есть попытки отработки - if (q.nExecCnt < q.nRetryAttempts) { + if (q.nExecCnt == 0 || q.nExecCnt < q.nRetryAttempts) { //Снова запускаем обработку сервером БД try { await dbProcess({ queue: q }); @@ -536,15 +474,10 @@ const processTask = async prms => { }); break; } - //Обработано с ошибками - case objQueueSchema.NQUEUE_EXEC_STATE_ERR: { - //Предупредим о неверном статусе сообщения (такие сюда попадать не должны) - await logger.warn(`Cообщение ${q.nId} в статусе ${q.sExecState} попало в очередь обработчика`, { - nQueueId: q.nId - }); - break; - } - //Обработано успешно + //Статусы "Обрабатывается сервером приложений", "Обрабатывается в БД", "Обработано с ошибками", "Обработано успешно" + case objQueueSchema.NQUEUE_EXEC_STATE_APP: + case objQueueSchema.NQUEUE_EXEC_STATE_DB: + case objQueueSchema.NQUEUE_EXEC_STATE_ERR: case objQueueSchema.NQUEUE_EXEC_STATE_OK: { //Предупредим о неверном статусе сообщения (такие сюда попадать не должны) await logger.warn(`Cообщение ${q.nId} в статусе ${q.sExecState} попало в очередь обработчика`, { @@ -552,6 +485,7 @@ const processTask = async prms => { }); break; } + //Неипонятный статус default: { //Ничего не делаем break; @@ -559,8 +493,9 @@ const processTask = async prms => { } //Отключаемся от БД if (dbConn) await dbConn.disconnect(); - //Отправляем успех - sendOKResult({ context: prms.task.service.context }); + //Отправляем успех или ошибку аутентификации + if (bUnAuthFlag) sendUnAuthResult(); + else sendOKResult(); } catch (e) { //Отключаемся от БД if (dbConn) await dbConn.disconnect(); diff --git a/models/obj_out_queue_processor.js b/models/obj_out_queue_processor.js index fba1c8e..00327f9 100644 --- a/models/obj_out_queue_processor.js +++ b/models/obj_out_queue_processor.js @@ -19,6 +19,7 @@ const { ServiceFunction } = require("./obj_service_function"); //Схема ва //Состояния обработки сообщений очереди обмена const STASK_RESULT_OK = "OK"; //Обработано успешно const STASK_RESULT_ERR = "ERR"; //Обработано с ошибками +const STASK_RESULT_UNAUTH = "UNAUTH"; //Не обработано из-за отсутсвия аутентификации //------------------ // Интерфейс модуля @@ -27,6 +28,7 @@ const STASK_RESULT_ERR = "ERR"; //Обработано с ошибками //Константы exports.STASK_RESULT_OK = STASK_RESULT_OK; exports.STASK_RESULT_ERR = STASK_RESULT_ERR; +exports.STASK_RESULT_UNAUTH = STASK_RESULT_UNAUTH; //Схема валидации задачи обработчику очереди исходящих сообщений exports.OutQueueProcessorTask = new Schema({ @@ -71,7 +73,7 @@ exports.OutQueueProcessorTaskResult = new Schema({ //Состояние обработки сообщения очереди обмена sResult: { type: String, - enum: [STASK_RESULT_OK, STASK_RESULT_ERR], + enum: [STASK_RESULT_OK, STASK_RESULT_ERR, STASK_RESULT_UNAUTH], required: true, message: { type: path => @@ -89,15 +91,6 @@ exports.OutQueueProcessorTaskResult = new Schema({ `Информация от обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, required: path => `Не указана информация от обработчика сообщения очереди обмена (${path})` } - }, - //Контекст работы сервиса - context: { - type: Object, - required: true, - message: { - type: path => `Контекст работы сервиса (${path}) имеет некорректный тип данных (ожидалось - Object)`, - required: path => `Не указан контекст работы сервиса (${path})` - } } }).validator({ required: val => typeof val != "undefined" @@ -125,13 +118,14 @@ exports.OutQueueProcessorFnBefore = new Schema({ required: path => `Не указано обработанное сообщение очереди (${path})` } }, - //Контекст работы сервиса - context: { - type: Object, + //Флаг ошибки аутентификации на удаленном сервисе + bUnAuth: { + type: Boolean, required: false, message: { - type: path => `Контекст работы сервиса (${path}) имеет некорректный тип данных (ожидалось - Object)`, - required: path => `Не указан контекст работы сервиса (${path})` + type: path => + `Флаг ошибки аутентификации на удаленном сервисе (${path}) имеет некорректный тип данных (ожидалось - Boolean)`, + required: path => `Не указан флаг ошибки аутентификации на удаленном сервисе (${path})` } } }); @@ -148,13 +142,14 @@ exports.OutQueueProcessorFnAfter = new Schema({ required: path => `Не указан результат обработки ответа удалённого сервиса (${path})` } }, - //Контекст работы сервиса - context: { - type: Object, + //Флаг ошибки аутентификации на удаленном сервисе + bUnAuth: { + type: Boolean, required: false, message: { - type: path => `Контекст работы сервиса (${path}) имеет некорректный тип данных (ожидалось - Object)`, - required: path => `Не указан контекст работы сервиса (${path})` + type: path => + `Флаг ошибки аутентификации на удаленном сервисе (${path}) имеет некорректный тип данных (ожидалось - Boolean)`, + required: path => `Не указан флаг ошибки аутентификации на удаленном сервисе (${path})` } } }); diff --git a/models/prms_out_queue_processor.js b/models/prms_out_queue_processor.js index e8aef4f..17f27b8 100644 --- a/models/prms_out_queue_processor.js +++ b/models/prms_out_queue_processor.js @@ -24,21 +24,8 @@ exports.sendErrorResult = new Schema({ type: String, required: true, message: { - type: path => `Идентификатор сервиса (${path}) имеет некорректный тип данных (ожидалось - String)`, - required: path => `Не указан идентификатор сервиса (${path})` - } - } -}); - -//Схема валидации параметров функции отправки успеха обработки -exports.sendOKResult = new Schema({ - //Контекст работы сервиса - context: { - type: Object, - required: true, - message: { - type: path => `Контекст работы сервиса (${path}) имеет некорректный тип данных (ожидалось - Object)`, - required: path => `Не указан контекст работы сервиса (${path})` + type: path => `Сообщение об ошибке (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указано сообщение об ошибке (${path})` } } });