From b09ca9e7d4802b16b0b6260aa215b71fd6809ffc Mon Sep 17 00:00:00 2001 From: Mikhail Chechnev Date: Fri, 7 Dec 2018 13:55:25 +0300 Subject: [PATCH] =?UTF-8?q?=D0=92=D0=BD=D0=B5=D0=B4=D1=80=D0=B5=D0=BD?= =?UTF-8?q?=D0=B8=D0=B5=20=D0=B0=D0=B2=D1=82=D0=BE=D0=BD=D0=BE=D0=BC=D0=BD?= =?UTF-8?q?=D0=BE=D0=B3=D0=BE=20=D0=BE=D0=B1=D1=80=D0=B0=D0=B1=D0=BE=D1=82?= =?UTF-8?q?=D1=87=D0=B8=D0=BA=D0=B0=20=D0=B8=D1=81=D1=85=D0=BE=D0=B4=D1=8F?= =?UTF-8?q?=D1=89=D0=B8=D1=85=20=D1=81=D0=BE=D0=BE=D0=B1=D1=89=D0=B5=D0=BD?= =?UTF-8?q?=D0=B8=D0=B9=20=D0=BE=D1=87=D0=B5=D1=80=D0=B5=D0=B4=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/app.js | 2 +- core/out_queue.js | 445 ++++++++++----------------- core/out_queue2.js | 291 ------------------ core/out_queue_processor.js | 468 +++++++++++++++++++++++------ core/out_queue_processor2.js | 381 ----------------------- models/obj_out_queue_processor.js | 150 ++------- models/prms_out_queue.js | 107 +++++-- models/prms_out_queue_processor.js | 54 +--- 8 files changed, 659 insertions(+), 1239 deletions(-) delete mode 100644 core/out_queue2.js delete mode 100644 core/out_queue_processor2.js diff --git a/core/app.js b/core/app.js index 41c06fa..568833d 100644 --- a/core/app.js +++ b/core/app.js @@ -9,7 +9,7 @@ const lg = require("./logger"); //Протоколирование работы const db = require("./db_connector"); //Взаимодействие с БД -const oq = require("./out_queue2"); //Прослушивание очереди исходящих сообщений +const oq = require("./out_queue"); //Прослушивание очереди исходящих сообщений const { ServerError } = require("./server_errors"); //Типовая ошибка const { validateObject } = require("./utils"); //Вспомогательные функции const { SERR_COMMON, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы diff --git a/core/out_queue.js b/core/out_queue.js index a821c27..661bcce 100644 --- a/core/out_queue.js +++ b/core/out_queue.js @@ -11,9 +11,9 @@ const _ = require("lodash"); //Работа с массивами и колле const EventEmitter = require("events"); //Обработчик пользовательских событий const ChildProcess = require("child_process"); //Работа с дочерними процессами const { ServerError } = require("./server_errors"); //Типовая ошибка -const { SERR_UNEXPECTED, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы -const { validateObject } = require("./utils"); //Вспомогательные функции -const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД +const { SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы +const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции +const { NINC_EXEC_CNT_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди const prmsOutQueueSchema = require("../models/prms_out_queue"); //Схемы валидации параметров функций класса @@ -60,6 +60,8 @@ class OutQueue extends EventEmitter { this.dbConn = prms.dbConn; //Запомним логгер this.logger = prms.logger; + //Список обрабатываемых в текущий момент сообщений очереди + this.inProgress = []; //Привяжем методы к указателю на себя для использования в обработчиках событий this.outDetectingLoop = this.outDetectingLoop.bind(this); } else { @@ -76,119 +78,106 @@ class OutQueue extends EventEmitter { //оповестим подписчиков о появлении нового отчета this.emit(SEVT_OUT_QUEUE_STOPPED); } - //Установка финальных статусов сообщения в БД - async finalise(prms) { - //Проверяем структуру переданного объекта для старта + //Добавление идентификатора позиции очереди в список обрабатываемых + addInProgress(prms) { + //Проверяем структуру переданного объекта let sCheckResult = validateObject( prms, - prmsOutQueueSchema.finalise, - "Параметры функции установки финальных статусов сообщения в БД" + prmsOutQueueSchema.addInProgress, + "Параметры функции добавления идентификатора позиции очереди в список обрабатываемых" ); //Если структура объекта в норме if (!sCheckResult) { - //Если больше нет попыток исполнения и сообщение не в статусе успешной обработки сервером БД - if ( - prms.queue.nExecState != objQueueSchema.NQUEUE_EXEC_STATE_DB_OK && - prms.queue.nExecCnt >= prms.queue.nRetryAttempts - ) { - //То считаем, что оно выполнено с ошибками и больше пытаться не надо - await this.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: prms.queue.sExecMsg, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR - }); - } else { - //Если сообщение успешно исполнено сервером БД - то значит оно успешно исполнено вообще - if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_DB_OK) { - await this.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK - }); - } else { - //Если сообщение в статусе исполнения сервером приложений (чего здесь быть не может) - это ошибка - if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP) { - //То выставим ему ошибку исполнения сервером приложений - await this.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: prms.queue.sExecMsg, - nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR - }); - } else { - //Если сообщение в статусе исполнения сервером БД (чего здесь быть не может) - это ошибка - if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_DB) { - //То выставим ему ошибку исполнения сервером БД - await this.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: prms.queue.sExecMsg, - nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR - }); - } - } - } + //Проверим, что такого идентификатора ещё нет в списке обрабатываемых + const i = this.inProgress.indexOf(prms.nQueueId); + //Если нет - добавим + if (i === -1) this.inProgress.push(prms.nQueueId); + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } + //Удаление идентификатора позиции очереди из списка обрабатываемых + rmInProgress(prms) { + //Проверяем структуру переданного объекта + let sCheckResult = validateObject( + prms, + prmsOutQueueSchema.rmInProgress, + "Параметры функции удаления идентификатора позиции очереди из списка обрабатываемых" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Если удаляемый идентификатор есть в списке + const i = this.inProgress.indexOf(prms.nQueueId); + //Удалим его + if (i > -1) { + this.inProgress.splice(i, 1); } } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } - //Запуск обработки сообщения сервером БД - async dbProcess(prms) { - //Проверяем структуру переданного объекта для старта + //Проверка наличия идентификатора позиции очереди в списке обрабатываемых + isInProgress(prms) { + //Проверяем структуру переданного объекта let sCheckResult = validateObject( prms, - prmsOutQueueSchema.dbProcess, - "Параметры функции запуска обработки ообщения сервером БД" + prmsOutQueueSchema.isInProgress, + "Параметры функции проверки наличия идентификатора позиции очереди в списке обрабатываемых" ); //Если структура объекта в норме if (!sCheckResult) { - //Буфер для текущего состояния сообщения - let curQueue = null; - //Обрабатываем - try { - //Фиксируем начало исполнения сервером БД - в статусе сообщения - curQueue = await this.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB - }); - //Вызов обработчика БД - curQueue = await this.dbConn.execQueueDBPrc({ nQueueId: prms.queue.nId }); - //Фиксируем успешное исполнение сервером БД - в статусе сообщения - curQueue = await this.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK - }); - //Фиксируем успешное исполнение сервером БД - в протоколе работы сервиса - await this.logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером БД`, { - nQueueId: prms.queue.nId - }); - } catch (e) { - //Сформируем текст ошибки - let sErr = `${SERR_UNEXPECTED}: ${e.message}`; - if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; - //Фиксируем ошибку обработки сервером БД - в статусе сообщения - curQueue = await this.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: sErr, - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR - }); - //Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса - await this.logger.error( - `Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером БД: ${sErr}`, - { nQueueId: prms.queue.nId } - ); - } - //Вернём текущее состоянии сообщения очереди - return curQueue; + //Проверим наличие идентификатора в списке + return !(this.inProgress.indexOf(prms.nQueueId) === -1); + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } + //Старт обработчика + startQueueProcessor(prms) { + //Проверяем структуру переданного объекта + let sCheckResult = validateObject( + prms, + prmsOutQueueSchema.startQueueProcessor, + "Параметры функции запуска обработчика сообщения очереди" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Добавляем идентификатор позиции очереди в список обрабатываемых + this.addInProgress({ nQueueId: prms.nQueueId }); + //Отдаём команду дочернему процессу обработчика на старт исполнения + prms.proc.send({ + nQueueId: prms.nQueueId, + connectSettings: this.dbConn.connectSettings + }); + //Уменьшаем количество доступных обработчиков + this.nWorkersLeft--; + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } + //Останов обработчика + stopQueueProcessor(prms) { + //Проверяем структуру переданного объекта для старта + let sCheckResult = validateObject( + prms, + prmsOutQueueSchema.stopQueueProcessor, + "Параметры функции останова обработчика сообщения очереди" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Удаляем идентификатор позиции очереди из списка обрабатываемых + this.rmInProgress({ nQueueId: prms.nQueueId }); + //Завершаем дочерний процесс обработчика + prms.proc.kill(); + //Увеличиваем количество доступных обработчиков + this.nWorkersLeft++; } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } //Запуск обработки очередного сообщения - async processMessage(prms) { - //Проверяем структуру переданного объекта для старта + processMessage(prms) { + //Проверяем структуру переданного объекта let sCheckResult = validateObject( prms, prmsOutQueueSchema.processMessage, @@ -204,30 +193,6 @@ class OutQueue extends EventEmitter { const proc = ChildProcess.fork("core/out_queue_processor", { silent: false }); //Текущее состояние сообщения let curQueue = null; - //Скажем что начали обработку - await self.logger.info( - `Обрабатываю исходящее сообщение: ${prms.queue.nId}, ${prms.queue.sInDate}, ${ - prms.queue.sServiceFnCode - }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, - { nQueueId: prms.queue.nId } - ); - //Установим его статус в БД - обрабатывается сервером приложений (только для новых или повторно обрабатываемых сервером приложений) - if ( - prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE || - prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR - ) { - curQueue = await self.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP - }); - } - //Установим его статус в БД - обрабатывается в БД (только если сюда пришло сообщение на повторную обработку сервером БД) - if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR) { - curQueue = await self.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB - }); - } //Перехват сообщений обработчика proc.on("message", async result => { //Проверяем структуру полученного сообщения @@ -238,168 +203,81 @@ class OutQueue extends EventEmitter { ); //Если структура сообщения в норме if (!sCheckResult) { - //Движение события по статусам в зависимости от того в каком состоянии его вернул обработчик - try { - //Работаем от статуса сообщения полученного от обработчика - switch (result.nExecState) { - //Ошибка обработки - case objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR: { - //Установим ошибочный статус в БД для сообщений и увеличим счетчик попыток отправки - curQueue = await self.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: result.sExecMsg, - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: result.nExecState - }); - //Фиксируем ошибку в протоколе работы сервиса - await self.logger.error( - `Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ${ - result.sExecMsg - }`, - { nQueueId: prms.queue.nId } - ); - break; - } - //Успех обработки - case objQueueSchema.NQUEUE_EXEC_STATE_APP_OK: { - //Если состояние менялось (а не просто повторная отработка) - if (result.nExecState != prms.queue.nExecState) { - //Пишем в базу успех отработки сервером приложений - результаты обработки - curQueue = await self.dbConn.setQueueAppSrvResult({ - nQueueId: prms.queue.nId, - blMsg: result.blMsg ? new Buffer(result.blMsg) : null, - blResp: result.blResp ? new Buffer(result.blResp) : null - }); - //Пишем в базу успех отработки сервером приложений - статус сообщения - curQueue = await self.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - nExecState: result.nExecState - }); - //Пишем в базу успех отработки сервером приложений - запись в протокол работы сервера приложений - await self.logger.info( - `Исходящее сообщение ${ - prms.queue.nId - } успешно отработано сервером приложений`, - { nQueueId: prms.queue.nId } - ); - } - //Запускаем обработку сервером БД - curQueue = await self.dbProcess(prms); - break; - } - //Обработчик ничего не делал - default: { - //Обработчик ничего не делал, но если сообщение сообщение в статусе - ошибка обработки сервером БД или обрабатывается сервером БД, то запустим обработчик БД - if (result.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR) { - //Запускаем обработчик сервера БД - curQueue = await self.dbProcess(prms); - } else { - //Во всех остальных случаях - ничего не делаем вообще - curQueue = prms.queue; - } - break; - } - } - } catch (e) { - //Сформируем текст ошибки - let sErr = `${SERR_UNEXPECTED}: ${e.message}`; - if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; - //Фиксируем ошибку обработки сервером приложений - статус сообщения - curQueue = await self.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: sErr, - nIncExecCnt: NINC_EXEC_CNT_YES + if (result.sResult == "ERR") { + //Фиксируем ошибку обработки - протокол работы сервиса + await self.logger.error(`Ошибка обработки исходящего сообщения: ${result.sMsg}`, { + nQueueId: prms.queue.nId + }); + //Фиксируем ошибку обработки - статус сообщения + await this.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: result.sMsg, + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: + prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts + ? prms.queue.nExecState + : objQueueSchema.NQUEUE_EXEC_STATE_ERR }); - //Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений - await self.logger.error(sErr, { nQueueId: prms.queue.nId }); } } else { - //Пришел неожиданный ответ обработчика - статус сообщения - curQueue = await self.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: sCheckResult, - nIncExecCnt: NINC_EXEC_CNT_YES - }); - //Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений + //Пришел неожиданный ответ обработчика - запись в протокол работы сервиса await self.logger.error( `Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`, { nQueueId: prms.queue.nId } ); - } - //Выставляем финальные статусы - try { - await self.finalise({ queue: curQueue }); - } catch (e) { - //Сформируем текст ошибки - let sErr = `${SERR_UNEXPECTED}: ${e.message}`; - if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; - //Установим его статус в БД - ошибка установки финального статуса - await self.dbConn.setQueueState({ + //Фиксируем ошибку обработки - статус сообщения + await this.dbConn.setQueueState({ nQueueId: prms.queue.nId, - sExecMsg: sErr, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR - }); - //Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений - await self.logger.error(`Фатальная ошибка обработчика сообщения ${prms.queue.nId}: ${sErr}`, { - nQueueId: prms.queue.nId + sExecMsg: `Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`, + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: + prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts + ? prms.queue.nExecState + : objQueueSchema.NQUEUE_EXEC_STATE_ERR }); } //Останавливаем обработчик и инкрементируем флаг их доступного количества - proc.kill(); - this.nWorkersLeft++; + try { + this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc }); + } catch (e) { + //Отразим в протоколе ошибку останова + await self.logger.error( + `Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`, + { nQueueId: prms.queue.nId } + ); + } }); //Перехват ошибок обработчика proc.on("error", async e => { - //Сформируем текст ошибки - let sErr = `${SERR_UNEXPECTED}: ${e.message}`; - if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; - //Установим его статус в БД - ошибка обработки сервером приложений - let curQueue = await self.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: sErr, - nIncExecCnt: NINC_EXEC_CNT_YES - }); - //Так же фиксируем ошибку в протоколе работы - await self.logger.error(`Ошибка обработки исходящего сообщения сервером приложений: ${sErr}`, { + //Фиксируем ошибку в протоколе работы + await self.logger.error(`Ошибка обработки исходящего сообщения: ${makeErrorText(e)}`, { nQueueId: prms.queue.nId }); - //Выставляем финальные статусы - try { - await self.finalise({ queue: curQueue }); - } catch (e) { - //Сформируем текст ошибки - let sErr = `${SERR_UNEXPECTED}: ${e.message}`; - if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; - //Установим его статус в БД - ошибка установки финального статуса - await self.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: sErr, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR - }); - //Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений - await self.logger.error(`Фатальная ошибка обработчика сообщения ${prms.queue.nId}: ${sErr}`, { - nQueueId: prms.queue.nId - }); - } + //Фиксируем ошибку обработки - статус сообщения + await this.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: makeErrorText(e), + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: + prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts + ? prms.queue.nExecState + : objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); //Останавливаем обработчик и инкрементируем флаг их доступного количества - proc.kill(); - this.nWorkersLeft++; + try { + this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc }); + } catch (e) { + //Отразим в протоколе ошибку останова + await self.logger.error( + `Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`, + { nQueueId: prms.queue.nId } + ); + } }); //Перехват останова обработчика proc.on("exit", code => {}); //Запускаем обработчик - proc.send({ - nQueueId: prms.queue.nId, - nExecState: prms.queue.nExecState, - blMsg: prms.queue.blMsg, - blResp: prms.queue.blResp, - service: _.find(this.services, { nId: prms.queue.nServiceId }), - function: _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, { - nId: prms.queue.nServiceFnId - }) - }); - //Уменьшаем количество доступных обработчиков - this.nWorkersLeft--; + this.startQueueProcessor({ nQueueId: prms.queue.nId, proc }); } } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); @@ -427,38 +305,22 @@ class OutQueue extends EventEmitter { //Обходим их for (let i = 0; i < outMsgs.length; i++) { //И запускаем обработчики - try { - await this.processMessage({ queue: outMsgs[i] }); - } catch (e) { - //Какие непредвиденные ошибки при обработке текущего сообщения - подготовим текст ошибки - let sErr = `${SERR_UNEXPECTED}: ${e.message}`; - if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; - //Фиксируем ошибку обработки сервером приложений - статус сообщения (сам статус - не меняем, здесь только фатальные ошибки, но делаем инкремент количества попыток) - let curQueue = await this.dbConn.setQueueState({ - nQueueId: outMsgs[i].nId, - sExecMsg: sErr, - nIncExecCnt: NINC_EXEC_CNT_YES - }); - //Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений - await this.logger.error(sErr, { nQueueId: outMsgs[i].nId }); - //Выставляем финальные статусы + if (!this.isInProgress({ nQueueId: outMsgs[i].nId })) { try { - await this.finalise({ queue: curQueue }); + this.processMessage({ queue: outMsgs[i] }); } catch (e) { - //Сформируем текст ошибки - let sErr = `${SERR_UNEXPECTED}: ${e.message}`; - if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; - //Установим его статус в БД - ошибка установки финального статуса - await self.dbConn.setQueueState({ + //Фиксируем ошибку обработки сервером приложений - статус сообщения + await this.dbConn.setQueueState({ nQueueId: outMsgs[i].nId, - sExecMsg: sErr, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR + sExecMsg: makeErrorText(e), + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: + outMsgs[i].nExecCnt + 1 < outMsgs[i].nRetryAttempts + ? outMsgs[i].nExecState + : objQueueSchema.NQUEUE_EXEC_STATE_ERR }); - //Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений - await self.logger.error( - `Фатальная ошибка обработчика сообщения ${outMsgs[i].nId}: ${sErr}`, - { nQueueId: outMsgs[i].nId } - ); + //Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений + await this.logger.error(makeErrorText(e), { nQueueId: outMsgs[i].nId }); } } } @@ -466,11 +328,8 @@ class OutQueue extends EventEmitter { //Запустили отработку всех считанных - перезапускаем цикл опроса исходящих сообщений await this.restartDetectingLoop(); } catch (e) { - //Какие непредвиденные ошибки при получении списка сообщений - подготовим текст ошибки - let sErr = `${SERR_UNEXPECTED}: ${e.message}`; - if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; //Фиксируем ошибку в протоколе работы сервера приложений - await this.logger.error(sErr); + await this.logger.error(makeErrorText(e)); await this.restartDetectingLoop(); } } else { diff --git a/core/out_queue2.js b/core/out_queue2.js deleted file mode 100644 index e46161e..0000000 --- a/core/out_queue2.js +++ /dev/null @@ -1,291 +0,0 @@ -/* - Сервис интеграции ПП Парус 8 с WEB API - Модуль ядра: отработка очереди исходящих сообщений -*/ - -//------------------------------ -// Подключение внешних библиотек -//------------------------------ - -const _ = require("lodash"); //Работа с массивами и коллекциями -const EventEmitter = require("events"); //Обработчик пользовательских событий -const ChildProcess = require("child_process"); //Работа с дочерними процессами -const { ServerError } = require("./server_errors"); //Типовая ошибка -const { SERR_UNEXPECTED, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы -const { validateObject } = require("./utils"); //Вспомогательные функции -const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД -const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди -const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди -const prmsOutQueueSchema = require("../models/prms_out_queue"); //Схемы валидации параметров функций класса - -//-------------------------- -// Глобальные идентификаторы -//-------------------------- - -//Типовые события -const SEVT_OUT_QUEUE_STARTED = "OUT_QUEUE_STARTED"; //Обработчик очереди запущен -const SEVT_OUT_QUEUE_STOPPED = "OUT_QUEUE_STOPPED"; //Обработчик очереди остановлен - -//Время отложенного старта опроса очереди (мс) -const NDETECTING_LOOP_DELAY = 3000; - -//Интервал проверки завершения обработчиков (мс) -const NWORKERS_WAIT_INTERVAL = 1000; - -//------------ -// Тело модуля -//------------ - -//Класс очереди сообщений -class OutQueue extends EventEmitter { - //Конструктор класса - constructor(prms) { - //Создадим экземпляр родительского класса - super(); - //Проверяем структуру переданного объекта для подключения - let sCheckResult = validateObject(prms, prmsOutQueueSchema.OutQueue, "Параметры конструктора класса OutQueue"); - //Если структура объекта в норме - if (!sCheckResult) { - //Список обслуживаемых сервисов - this.services = null; - //Признак функционирования обработчика - this.bWorking = false; - //Параметры очереди - this.outGoing = _.cloneDeep(prms.outGoing); - //Количество доступных обработчиков - this.nWorkersLeft = this.outGoing.nMaxWorkers; - //Идентификатор таймера проверки очереди - this.nDetectingLoopTimeOut = null; - //Запомним подключение к БД - this.dbConn = prms.dbConn; - //Запомним логгер - this.logger = prms.logger; - //Список обрабатываемых в текущий момент сообщений очереди - this.inProgress = []; - //Привяжем методы к указателю на себя для использования в обработчиках событий - this.outDetectingLoop = this.outDetectingLoop.bind(this); - } else { - throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); - } - } - //Уведомление об остановке обработчика очереди - notifyStarted() { - //оповестим подписчиков о появлении нового отчета - this.emit(SEVT_OUT_QUEUE_STARTED); - } - //Уведомление об остановке обработчика очереди - notifyStopped() { - //оповестим подписчиков о появлении нового отчета - this.emit(SEVT_OUT_QUEUE_STOPPED); - } - //Добавление идентификатора позиции очереди в список обрабатываемых - addInProgress(nId) { - const i = this.inProgress.indexOf(nId); - if (i === -1) this.inProgress.push(nId); - } - //Удаление идентификатора позиции очереди из списка обрабатываемых - rmInProgress(nId) { - const i = this.inProgress.indexOf(nId); - if (i > -1) { - this.inProgress.splice(i, 1); - } - } - //Проверка наличия идентификатора позиции очереди в списке обрабатываемых - isInProgress(nId) { - return !(this.inProgress.indexOf(nId) === -1); - } - //Запуск обработки очередного сообщения - processMessage(prms) { - //Проверяем структуру переданного объекта для старта - let sCheckResult = validateObject( - prms, - prmsOutQueueSchema.processMessage, - "Параметры функции запуска обработки очередного сообщения" - ); - //Если структура объекта в норме - if (!sCheckResult) { - //Проверим, что есть доступные обработчики - if (this.nWorkersLeft > 0) { - //Переопределим себя для обращения внутри обработчиков событий - const self = this; - //Создаём новый обработчик сообщений - const proc = ChildProcess.fork("core/out_queue_processor2", { silent: false }); - //Текущее состояние сообщения - let curQueue = null; - //Перехват сообщений обработчика - proc.on("message", async result => { - //Проверяем структуру полученного сообщения - /* - let sCheckResult = validateObject( - result, - objOutQueueProcessorSchema.OutQueueProcessorTaskResult, - "Ответ обработчика очереди исходящих сообщений" - ); - //Если структура сообщения в норме - if (!sCheckResult) { - */ - - /* - } else { - //Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений - await self.logger.error( - `Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`, - { nQueueId: prms.queue.nId } - ); - } - */ - if (result.sExecResult == "ERR") { - await self.logger.error(`Ошибка обработки исходящего сообщения: ${result.sExecMsg}`, { - nQueueId: prms.queue.nId - }); - //Фиксируем ошибку обработки - статус сообщения (сам статус - не меняем, здесь только фатальные ошибки, но делаем инкремент количества попыток) - await this.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: result.sExecMsg, - nIncExecCnt: NINC_EXEC_CNT_YES - }); - } - //Останавливаем обработчик и инкрементируем флаг их доступного количества - this.rmInProgress(prms.queue.nId); - proc.kill(); - this.nWorkersLeft++; - }); - //Перехват ошибок обработчика - proc.on("error", async e => { - //Сформируем текст ошибки - let sErr = `${SERR_UNEXPECTED}: ${e.message}`; - if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; - //Фиксируем ошибку в протоколе работы - await self.logger.error(`Ошибка обработки исходящего сообщения: ${sErr}`, { - nQueueId: prms.queue.nId - }); - //Фиксируем ошибку обработки - статус сообщения (сам статус - не меняем, здесь только фатальные ошибки, но делаем инкремент количества попыток) - await this.dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: sErr, - nIncExecCnt: NINC_EXEC_CNT_YES - }); - //Останавливаем обработчик и инкрементируем флаг их доступного количества - this.rmInProgress(prms.queue.nId); - proc.kill(); - this.nWorkersLeft++; - }); - //Перехват останова обработчика - proc.on("exit", code => {}); - //Запускаем обработчик - this.addInProgress(prms.queue.nId); - proc.send({ - nQueueId: prms.queue.nId, - connectSettings: self.dbConn.connectSettings - }); - //Уменьшаем количество доступных обработчиков - this.nWorkersLeft--; - } - } else { - throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); - } - } - //Перезапуск опроса очереди исходящих сообщений - async restartDetectingLoop() { - //Включаем опрос очереди только если установлен флаг работы - if (this.bWorking) { - this.nDetectingLoopTimeOut = await setTimeout(async () => { - await this.outDetectingLoop(); - }, this.outGoing.nCheckTimeout); - } - } - //Опрос очереди исходящих сообщений - async outDetectingLoop() { - //Если есть свободные обработчики - if (this.nWorkersLeft > 0) { - //Сходим на сервер за очередным исходящим сообщением - try { - //Заберем столько сообщений, сколько можем обработать одновременно - let outMsgs = await this.dbConn.getOutgoing({ nPortionSize: this.nWorkersLeft }); - //Если есть сообщения - if (Array.isArray(outMsgs) && outMsgs.length > 0) { - //Обходим их - for (let i = 0; i < outMsgs.length; i++) { - //И запускаем обработчики - if (!this.isInProgress(outMsgs[i].nId)) { - try { - this.processMessage({ queue: outMsgs[i] }); - } catch (e) { - //Какие непредвиденные ошибки при обработке текущего сообщения - подготовим текст ошибки - let sErr = `${SERR_UNEXPECTED}: ${e.message}`; - if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; - //Фиксируем ошибку обработки сервером приложений - статус сообщения (сам статус - не меняем, здесь только фатальные ошибки, но делаем инкремент количества попыток) - await this.dbConn.setQueueState({ - nQueueId: outMsgs[i].nId, - sExecMsg: sErr, - nIncExecCnt: NINC_EXEC_CNT_YES - }); - //Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений - await this.logger.error(sErr, { nQueueId: outMsgs[i].nId }); - } - } - } - } - //Запустили отработку всех считанных - перезапускаем цикл опроса исходящих сообщений - await this.restartDetectingLoop(); - } catch (e) { - //Какие непредвиденные ошибки при получении списка сообщений - подготовим текст ошибки - let sErr = `${SERR_UNEXPECTED}: ${e.message}`; - if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; - //Фиксируем ошибку в протоколе работы сервера приложений - await this.logger.error(sErr); - await this.restartDetectingLoop(); - } - } else { - //Нет свободных обработчиков - ждём и перезапускаем цикл опроса - await this.restartDetectingLoop(); - } - } - //Запуск обработки очереди исходящих сообщений - startProcessing(prms) { - //Проверяем структуру переданного объекта для старта - let sCheckResult = validateObject( - prms, - prmsOutQueueSchema.startProcessing, - "Параметры функции запуска обработки очереди исходящих сообщений" - ); - //Если структура объекта в норме - if (!sCheckResult) { - //Выставляем флаг работы - this.bWorking = true; - //запоминаем список обслуживаемых сервисов - this.services = prms.services; - //Начинаем слушать очередь исходящих - setTimeout(this.outDetectingLoop, NDETECTING_LOOP_DELAY); - //И оповещаем всех что запустились - this.notifyStarted(); - } else { - throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); - } - } - //Остановка обработки очереди исходящих сообщений - stopProcessing() { - //Выставляем флаг неработы - this.bWorking = false; - //Останавливаем опрос очереди - if (this.nDetectingLoopTimeOut) { - clearTimeout(this.nDetectingLoopTimeOut); - this.nDetectingLoopTimeOut = null; - } - //Ждем завершения работы всех обработчиков - let i = setInterval(() => { - if (!this.bWorking && this.nWorkersLeft == this.outGoing.nMaxWorkers) { - clearInterval(i); - this.notifyStopped(); - } - }, NWORKERS_WAIT_INTERVAL); - } -} - -//----------------- -// Интерфейс модуля -//----------------- - -exports.SEVT_OUT_QUEUE_STARTED = SEVT_OUT_QUEUE_STARTED; -exports.SEVT_OUT_QUEUE_STOPPED = SEVT_OUT_QUEUE_STOPPED; -exports.OutQueue = OutQueue; diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 91396e2..3827e89 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -7,8 +7,10 @@ // Подключение библиотек //---------------------- -const _ = require("lodash"); //Работа с массивами и коллекциями -const { makeModuleFullPath, validateObject } = require("./utils"); //Вспомогательные функции +require("module-alias/register"); //Поддержка псевонимов при подключении модулей +const lg = require("./logger"); //Протоколирование работы +const db = require("./db_connector"); //Взаимодействие с БД +const { makeErrorText, makeModuleFullPath, validateObject } = require("./utils"); //Вспомогательные функции const { ServerError } = require("./server_errors"); //Типовая ошибка const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений const prmsOutQueueProcessorSchema = require("../models/prms_out_queue_processor"); //Схема валидации параметров функций модуля @@ -19,8 +21,14 @@ const { SERR_OBJECT_BAD_INTERFACE, SERR_MODULES_NO_MODULE_SPECIFIED } = require("./constants"); //Глобальные константы -//!!!!!!!!!!!!!!!!!!!!!!! УБРАТЬ!!!!!!!!!!!!!!!!!!!!!! -const fs = require("fs"); +const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД + +//-------------------------- +// Глобальные идентификаторы +//-------------------------- + +let dbConn = null; //Подключение к БД +let logger = null; //Протоколирование работы //------------ // Тело модуля @@ -28,84 +36,162 @@ const fs = require("fs"); //Отправка родительскому процессу ошибки обработки сообщения сервером приложений const sendErrorResult = prms => { - //Проверяем параметры + //Проверяем структуру переданного сообщения let sCheckResult = validateObject( prms, prmsOutQueueProcessorSchema.sendErrorResult, - "Параметры функции отправки ошибки обработки" + "Параметры функции отправки родительскому процессу ошибки обработки сообщения" ); - //Если параметры в норме + //Если структура объекта в норме if (!sCheckResult) { - //Отправляем родительскому процессу ошибку process.send({ - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR, - sExecMsg: prms.sMessage, - blMsg: null, - blResp: null + sResult: objOutQueueProcessorSchema.STASK_RESULT_ERR, + sMsg: prms.sMessage }); } else { - //Отправляем родительскому процессу сведения об ошибочных параметрах process.send({ - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR, - sExecMsg: sCheckResult, - blMsg: null, - blResp: null + sResult: objOutQueueProcessorSchema.STASK_RESULT_ERR, + sMsg: sCheckResult }); } }; //Отправка родительскому процессу успеха обработки сообщения сервером приложений -const sendOKResult = prms => { - //Проверяем параметры +const sendOKResult = () => { + process.send({ + sResult: objOutQueueProcessorSchema.STASK_RESULT_OK, + sMsg: null + }); +}; + +//Запуск обработки сообщения сервером приложений +const appProcess = async prms => { + //Проверяем структуру переданного объекта для старта let sCheckResult = validateObject( prms, - prmsOutQueueProcessorSchema.sendOKResult, - "Параметры функции отправки ответа об успехе обработки" + prmsOutQueueProcessorSchema.appProcess, + "Параметры функции запуска обработки ообщения сервером приложений" ); - //Если параметры в норме + //Если структура объекта в норме if (!sCheckResult) { - //Отправляем родительскому процессу успех - process.send({ - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK, - sExecMsg: null, - blMsg: prms.blMsg, - blResp: prms.blResp - }); + //Обработанное сообщение + let newQueue = null; + //Обрабатываем + try { + //Фиксируем начало исполнения сервером приложений - в статусе сообщения + newQueue = await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP + }); + //Скажем что начали обработку + await logger.info( + `Обрабатываю исходящее сообщение сервером приложений: ${prms.queue.nId}, ${prms.queue.sInDate}, ${ + prms.queue.sServiceFnCode + }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, + { nQueueId: prms.queue.nId } + ); + if (prms.queue.blMsg) { + let sMsg = prms.queue.blMsg.toString() + " MODIFICATION FOR " + prms.queue.nId; + //Фиксируем успех исполнения + newQueue = await dbConn.setQueueAppSrvResult({ + nQueueId: prms.queue.nId, + blMsg: new Buffer(sMsg), + blResp: new Buffer("REPLAY ON " + prms.queue.nId) + }); + //Фиксируем успешное исполнение сервером приложений - в статусе сообщения + newQueue = await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK + }); + //Фиксируем успешное исполнение сервером приложений - в протоколе работы сервиса + await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, { + nQueueId: prms.queue.nId + }); + } else { + throw new ServerError( + SERR_UNEXPECTED, + `Ошибка отработки сообщения ${prms.queue.nId}: нет данных для обработки` + ); + } + } catch (e) { + //Фиксируем ошибку обработки сервером приложений - в статусе сообщения + newQueue = await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: makeErrorText(e), + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: + prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts + ? objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR + : objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + //Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса + await logger.error( + `Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ${makeErrorText(e)}`, + { nQueueId: prms.queue.nId } + ); + } + //Возвращаем результат + return newQueue; } else { - //Отправляем родительскому процессу сведения об ошибочных параметрах - process.send({ - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR, - sExecMsg: sCheckResult, - blMsg: null, - blResp: null - }); + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } }; -//Отправка родительскому процессу сообщения без обработки -const sendUnChange = prms => { - //Проверяем параметры +//Запуск обработки сообщения сервером БД +const dbProcess = async prms => { + //Проверяем структуру переданного объекта для старта let sCheckResult = validateObject( prms, - prmsOutQueueProcessorSchema.sendUnChange, - "Параметры функции отправки сообщения без обработки" + prmsOutQueueProcessorSchema.dbProcess, + "Параметры функции запуска обработки ообщения сервером БД" ); - //Если параметры в норме + //Если структура объекта в норме if (!sCheckResult) { - process.send({ - nExecState: prms.task.nExecState, - sExecMsg: null, - blMsg: prms.task.blMsg ? new Buffer(prms.task.blMsg) : null, - blResp: prms.task.blResp ? new Buffer(prms.task.blResp) : null - }); + //Обрабатываем + try { + //Фиксируем начало исполнения сервером БД - в статусе сообщения + await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB + }); + //Скажем что начали обработку + await logger.info( + `Обрабатываю исходящее сообщение сервером БД: ${prms.queue.nId}, ${prms.queue.sInDate}, ${ + prms.queue.sServiceFnCode + }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, + { nQueueId: prms.queue.nId } + ); + //Вызов обработчика БД + await dbConn.execQueueDBPrc({ nQueueId: prms.queue.nId }); + //Фиксируем успешное исполнение сервером БД - в статусе сообщения + await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK + }); + //Фиксируем успешное исполнение сервером БД - в протоколе работы сервиса + await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером БД`, { + nQueueId: prms.queue.nId + }); + } catch (e) { + //Фиксируем ошибку обработки сервером БД - в статусе сообщения + await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: makeErrorText(e), + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: + prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts + ? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR + : objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + //Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса + await logger.error( + `Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером БД: ${makeErrorText(e)}`, + { nQueueId: prms.queue.nId } + ); + } } else { - //Отправляем родительскому процессу сведения об ошибочных параметрах - process.send({ - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR, - sExecMsg: sCheckResult, - blMsg: null, - blResp: null - }); + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } }; @@ -119,38 +205,248 @@ const processTask = async prms => { ); //Если параметры в норме if (!sCheckResult) { - //Обработке подлежат только необработанные сервером приложений сообщения - if ( - prms.task.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE || - prms.task.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR - ) { - setTimeout(() => { - //if (prms.task.nQueueId == 2) { - // sendErrorResult({ sMessage: "Ошибка отработки сообщения " + prms.task.nQueueId }); - //} else { - if (prms.task.blMsg) { - let b = new Buffer(prms.task.blMsg); - fs.writeFile("c:/repos/temp/" + prms.task.nQueueId, b, err => { - if (err) { - sendErrorResult({ sMessage: err.message }); - } else { - let sMsg = b.toString() + " MODIFICATION FOR " + prms.task.nQueueId; - sendOKResult({ - blMsg: new Buffer(sMsg), - blResp: new Buffer("REPLAY ON " + prms.task.nQueueId) - }); + let q = null; + try { + //Создаём подключение к БД + dbConn = new db.DBConnector({ connectSettings: prms.task.connectSettings }); + //Создаём логгер для протоколирования работы + logger = new lg.Logger(); + //Подключим логгер к БД (и отключим когда надо) + dbConn.on(db.SEVT_DB_CONNECTOR_CONNECTED, connection => { + logger.setDBConnector(dbConn, true); + }); + dbConn.on(db.SEVT_DB_CONNECTOR_DISCONNECTED, () => { + logger.removeDBConnector(); + }); + //Подключаемся к БД + await dbConn.connect(); + //Считываем запись очереди + q = await dbConn.getQueue({ nQueueId: prms.task.nQueueId }); + //Далее работаем от статуса считанной записи + switch (q.nExecState) { + //Поставлено в очередь + case objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE: { + //Запускаем обработку сервером приложений + try { + let res = await appProcess({ queue: q }); + //И если она успешно завершилась - обработку сервером БД + if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { + try { + await dbProcess({ queue: res }); + } catch (e) { + //Фиксируем ошибку обработки сервером БД - в статусе сообщения + await dbConn.setQueueState({ + nQueueId: res.nId, + sExecMsg: makeErrorText(e), + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: + res.nExecCnt + 1 < res.nRetryAttempts + ? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR + : objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + //Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса + await logger.error( + `Ошибка обработки исходящего сообщения ${res.nId} сервером БД: ${makeErrorText(e)}`, + { nQueueId: res.nId } + ); + } } - }); - } else { - sendErrorResult({ - sMessage: "Ошибка отработки сообщения " + prms.task.nQueueId + ": нет данных для обработки" - }); + } catch (e) { + //Фиксируем ошибку обработки сервером приложений - в статусе сообщения + newQueue = await dbConn.setQueueState({ + nQueueId: q.nId, + sExecMsg: makeErrorText(e), + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: + q.nExecCnt + 1 < q.nRetryAttempts + ? objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR + : objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + //Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса + await logger.error( + `Ошибка обработки исходящего сообщения ${q.nId} сервером приложений: ${makeErrorText(e)}`, + { nQueueId: q.nId } + ); + } + break; } - //} - }, 3000); - } else { - //Остальные возвращаем без изменения и отработки, с сохранением статусов и сообщений - sendUnChange(prms); + //Обрабатывается сервером приложений + case objQueueSchema.NQUEUE_EXEC_STATE_APP: { + //Предупредим о неверном статусе сообщения (такие сюда попадать не должны) + await logger.warn(`Cообщение ${q.nId} в статусе ${q.sExecState} попало в очередь обработчика`, { + nQueueId: q.nId + }); + break; + } + //Ошибка обработки сервером приложений + case objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR: { + //Если ещё есть попытки отработки + if (q.nExecCnt < q.nRetryAttempts) { + //Снова запускаем обработку сервером приложений + try { + let res = await appProcess({ queue: q }); + //И если она успешно завершилась - обработку сервоером БД + if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { + try { + await dbProcess({ queue: res }); + } catch (e) { + //Фиксируем ошибку обработки сервером БД - в статусе сообщения + await dbConn.setQueueState({ + nQueueId: res.nId, + sExecMsg: makeErrorText(e), + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: + res.nExecCnt + 1 < res.nRetryAttempts + ? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR + : objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + //Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса + await logger.error( + `Ошибка обработки исходящего сообщения ${res.nId} сервером БД: ${makeErrorText( + e + )}`, + { nQueueId: res.nId } + ); + } + } + } catch (e) { + //Фиксируем ошибку обработки сервером приложений - в статусе сообщения + newQueue = await dbConn.setQueueState({ + nQueueId: q.nId, + sExecMsg: makeErrorText(e), + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: + q.nExecCnt + 1 < q.nRetryAttempts + ? objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR + : objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + //Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса + await logger.error( + `Ошибка обработки исходящего сообщения ${q.nId} сервером приложений: ${makeErrorText( + e + )}`, + { nQueueId: q.nId } + ); + } + } else { + //Попыток нет - финализируем обработку + await dbConn.setQueueState({ + nQueueId: q.nId, + sExecMsg: q.sExecMsg, + nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + } + break; + } + //Успешно обработано сервером приложений + case objQueueSchema.NQUEUE_EXEC_STATE_APP_OK: { + //Запускаем обработку в БД + try { + await dbProcess({ queue: q }); + } catch (e) { + //Фиксируем ошибку обработки сервером БД - в статусе сообщения + await dbConn.setQueueState({ + nQueueId: q.nId, + sExecMsg: makeErrorText(e), + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: + q.nExecCnt + 1 < q.nRetryAttempts + ? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR + : objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + //Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса + await logger.error( + `Ошибка обработки исходящего сообщения ${q.nId} сервером БД: ${makeErrorText(e)}`, + { nQueueId: q.nId } + ); + } + break; + } + //Обрабатывается в БД + case objQueueSchema.NQUEUE_EXEC_STATE_DB: { + //Предупредим о неверном статусе сообщения (такие сюда попадать не должны) + await logger.warn(`Cообщение ${q.nId} в статусе ${q.sExecState} попало в очередь обработчика`, { + nQueueId: q.nId + }); + break; + } + //Ошибка обработки в БД + case objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR: { + //Если ещё есть попытки отработки + if (q.nExecCnt < q.nRetryAttempts) { + //Снова запускаем обработку сервером БД + try { + await dbProcess({ queue: q }); + } catch (e) { + //Фиксируем ошибку обработки сервером БД - в статусе сообщения + await dbConn.setQueueState({ + nQueueId: q.nId, + sExecMsg: makeErrorText(e), + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: + q.nExecCnt + 1 < q.nRetryAttempts + ? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR + : objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + //Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса + await logger.error( + `Ошибка обработки исходящего сообщения ${q.nId} сервером БД: ${makeErrorText(e)}`, + { nQueueId: q.nId } + ); + } + } else { + //Попыток нет - финализируем обработку + await dbConn.setQueueState({ + nQueueId: q.nId, + sExecMsg: q.sExecMsg, + nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + } + break; + } + //Успешно обработано в БД + case objQueueSchema.NQUEUE_EXEC_STATE_DB_OK: { + //Финализируем + await dbConn.setQueueState({ + nQueueId: q.nId, + sExecMsg: null, + nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK + }); + break; + } + //Обработано с ошибками + case objQueueSchema.NQUEUE_EXEC_STATE_ERR: { + //Предупредим о неверном статусе сообщения (такие сюда попадать не должны) + await logger.warn(`Cообщение ${q.nId} в статусе ${q.sExecState} попало в очередь обработчика`, { + nQueueId: q.nId + }); + break; + } + //Обработано успешно + case objQueueSchema.NQUEUE_EXEC_STATE_OK: { + //Предупредим о неверном статусе сообщения (такие сюда попадать не должны) + await logger.warn(`Cообщение ${q.nId} в статусе ${q.sExecState} попало в очередь обработчика`, { + nQueueId: q.nId + }); + break; + } + default: { + //Ничего не делаем + break; + } + } + //Отключаемся от БД + if (dbConn) await dbConn.disconnect(); + //Отправляем успех + sendOKResult(); + } catch (e) { + //Отключаемся от БД + if (dbConn) await dbConn.disconnect(); + //Отправляем ошибку + sendErrorResult({ sMessage: makeErrorText(e) }); } } else { sendErrorResult({ sMessage: sCheckResult }); @@ -173,7 +469,7 @@ process.on("SIGTERM", () => {}); //Перехват ошибок process.on("uncaughtException", e => { //Отправляем ошибку родительскому процессу - sendErrorResult({ sMessage: e.message }); + sendErrorResult({ sMessage: makeErrorText(e) }); }); //Приём сообщений diff --git a/core/out_queue_processor2.js b/core/out_queue_processor2.js deleted file mode 100644 index 193f4a6..0000000 --- a/core/out_queue_processor2.js +++ /dev/null @@ -1,381 +0,0 @@ -/* - Сервис интеграции ПП Парус 8 с WEB API - Модуль ядра: обработчик исходящего сообщения -*/ - -//---------------------- -// Подключение библиотек -//---------------------- - -require("module-alias/register"); //Поддержка псевонимов при подключении модулей -const _ = require("lodash"); //Работа с массивами и коллекциями -const lg = require("./logger"); //Протоколирование работы -const db = require("./db_connector"); //Взаимодействие с БД -const { makeModuleFullPath, validateObject } = require("./utils"); //Вспомогательные функции -const { ServerError } = require("./server_errors"); //Типовая ошибка -const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений -const prmsOutQueueProcessorSchema = require("../models/prms_out_queue_processor"); //Схема валидации параметров функций модуля -const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди -const { - SERR_UNEXPECTED, - SERR_MODULES_BAD_INTERFACE, - SERR_OBJECT_BAD_INTERFACE, - SERR_MODULES_NO_MODULE_SPECIFIED -} = require("./constants"); //Глобальные константы -const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД - -//---------- -// Константы -//---------- - -//-------------------------- -// Глобальные идентификаторы -//-------------------------- - -let dbConn = null; //Подключение к БД -let logger = null; //Протоколирование работы - -//------------ -// Тело модуля -//------------ - -//Отправка родительскому процессу ошибки обработки сообщения сервером приложений -const sendErrorResult = sMessage => { - process.send({ - sExecResult: "ERR", - sExecMsg: sMessage - }); -}; - -//Отправка родительскому процессу успеха обработки сообщения сервером приложений -const sendOKResult = () => { - process.send({ - sExecResult: "OK", - sExecMsg: null - }); -}; - -//Запись в файл !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! УБРАТЬ!!!!!!!!!!!!!!!!! -const writeToFile = queue => { - return new Promise((resolve, reject) => { - const fs = require("fs"); - fs.writeFile("c:/repos/temp/" + queue.nId, queue.blMsg, err => { - if (err) { - reject(new ServerError(SERR_UNEXPECTED, `Ошибка отработки сообщения ${prms.queue.nId}`)); - } else { - resolve(); - } - }); - }); -}; - -//Запуск обработки сообщения сервером приложений -const appProcess = async prms => { - //Обработанное сообщение - let newQueue = null; - //Обрабатываем - try { - //Фиксируем начало исполнения сервером приложений - в статусе сообщения - newQueue = await dbConn.setQueueState({ - nQueueId: prms.queue.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP - }); - //Скажем что начали обработку - await logger.info( - `Обрабатываю исходящее сообщение сервером приложений: ${prms.queue.nId}, ${prms.queue.sInDate}, ${ - prms.queue.sServiceFnCode - }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, - { nQueueId: prms.queue.nId } - ); - if (prms.queue.blMsg) { - await writeToFile(prms.queue); - let sMsg = prms.queue.blMsg.toString() + " MODIFICATION FOR " + prms.queue.nId; - //Фиксируем успех исполнения - newQueue = await dbConn.setQueueAppSrvResult({ - nQueueId: prms.queue.nId, - blMsg: new Buffer(sMsg), - blResp: new Buffer("REPLAY ON " + prms.queue.nId) - }); - //Фиксируем успешное исполнение сервером приложений - в статусе сообщения - newQueue = await dbConn.setQueueState({ - nQueueId: prms.queue.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK - }); - //Фиксируем успешное исполнение сервером приложений - в протоколе работы сервиса - await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, { - nQueueId: prms.queue.nId - }); - } else { - throw new ServerError( - SERR_UNEXPECTED, - `Ошибка отработки сообщения ${prms.queue.nId}: нет данных для обработки` - ); - } - } catch (e) { - //Сформируем текст ошибки - let sErr = `${SERR_UNEXPECTED}: ${e.message}`; - if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; - //Фиксируем ошибку обработки сервером приложений - в статусе сообщения - newQueue = await dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: sErr, - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: - prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts - ? objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR - : objQueueSchema.NQUEUE_EXEC_STATE_ERR - }); - //Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса - await logger.error(`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ${sErr}`, { - nQueueId: prms.queue.nId - }); - } - //Возвращаем результат - return newQueue; -}; - -//Запуск обработки сообщения сервером БД -const dbProcess = async prms => { - //Проверяем структуру переданного объекта для старта - //let sCheckResult = validateObject( - // prms, - // prmsOutQueueSchema.dbProcess, - // "Параметры функции запуска обработки ообщения сервером БД" - //); - //Если структура объекта в норме - //if (!sCheckResult) { - //Обрабатываем - try { - //Фиксируем начало исполнения сервером БД - в статусе сообщения - await dbConn.setQueueState({ - nQueueId: prms.queue.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB - }); - //Скажем что начали обработку - await logger.info( - `Обрабатываю исходящее сообщение сервером БД: ${prms.queue.nId}, ${prms.queue.sInDate}, ${ - prms.queue.sServiceFnCode - }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, - { nQueueId: prms.queue.nId } - ); - //Вызов обработчика БД - await dbConn.execQueueDBPrc({ nQueueId: prms.queue.nId }); - //Фиксируем успешное исполнение сервером БД - в статусе сообщения - await dbConn.setQueueState({ - nQueueId: prms.queue.nId, - nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK - }); - //Фиксируем успешное исполнение сервером БД - в протоколе работы сервиса - await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером БД`, { - nQueueId: prms.queue.nId - }); - } catch (e) { - //Сформируем текст ошибки - let sErr = `${SERR_UNEXPECTED}: ${e.message}`; - if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; - //Фиксируем ошибку обработки сервером БД - в статусе сообщения - await dbConn.setQueueState({ - nQueueId: prms.queue.nId, - sExecMsg: sErr, - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: - prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts - ? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR - : objQueueSchema.NQUEUE_EXEC_STATE_ERR - }); - //Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса - await logger.error(`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером БД: ${sErr}`, { - nQueueId: prms.queue.nId - }); - } - //} else { - // throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); - //} -}; - -//Протоколирование предупреждения о ненадлежащем статусе сообщения -const warnBadStateForProcess = async prms => { - //Предупредим о неверном статусе сообщения (такие сюда попадать не должны) - await logger.warn(`Cообщение ${prms.queue.nId} в статусе ${prms.queue.sExecState} попало в очередь обработчика`, { - nQueueId: prms.queue.nId - }); -}; - -//Обработка задачи -const processTask = async prms => { - //Проверяем параметры - /* - let sCheckResult = validateObject( - prms, - prmsOutQueueProcessorSchema.processTask, - "Параметры функции обработки задачи" - ); - */ - //Если параметры в норме - //if (!sCheckResult) { - let q = null; - try { - //Создаём подключение к БД - dbConn = new db.DBConnector({ connectSettings: prms.task.connectSettings }); - //Создаём логгер для протоколирования работы - logger = new lg.Logger(); - //Подключим логгер к БД (и отключим когда надо) - dbConn.on(db.SEVT_DB_CONNECTOR_CONNECTED, connection => { - logger.setDBConnector(dbConn, true); - }); - dbConn.on(db.SEVT_DB_CONNECTOR_DISCONNECTED, () => { - logger.removeDBConnector(); - }); - //Подключаемся к БД - await dbConn.connect(); - //Считываем запись очереди - q = await dbConn.getQueue({ nQueueId: prms.task.nQueueId }); - //Далее работаем от статуса считанной записи - switch (q.nExecState) { - //Поставлено в очередь - case objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE: { - //Запускаем обработку сервером приложений - let res = await appProcess({ queue: q }); - //И если она успешно завершилась - обработку сервоером БД - if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) await dbProcess({ queue: res }); - break; - } - //Обрабатывается сервером приложений - case objQueueSchema.NQUEUE_EXEC_STATE_APP: { - //Ничего не делаем, но предупредим что так быть не должно - await warnBadStateForProcess({ queue: q }); - break; - } - //Ошибка обработки сервером приложений - case objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR: { - //Если ещё есть попытки отработки - if (q.nExecCnt < q.nRetryAttempts) { - //Снова запускаем обработку сервером приложений - let res = await appProcess({ queue: q }); - //И если она успешно завершилась - обработку сервоером БД - if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) await dbProcess({ queue: res }); - } else { - //Попыток нет - финализируем обработку - await dbConn.setQueueState({ - nQueueId: q.nId, - sExecMsg: q.sExecMsg, - nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR - }); - } - break; - } - //Успешно обработано сервером приложений - case objQueueSchema.NQUEUE_EXEC_STATE_APP_OK: { - //Запускаем обработку в БД - await dbProcess({ queue: q }); - break; - } - //Обрабатывается в БД - case objQueueSchema.NQUEUE_EXEC_STATE_DB: { - //Ничего не делаем, но предупредим что так быть не должно - await warnBadStateForProcess({ queue: q }); - break; - } - //Ошибка обработки в БД - case objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR: { - //Если ещё есть попытки отработки - if (q.nExecCnt < q.nRetryAttempts) { - //Снова запускаем обработку сервером БД - await dbProcess({ queue: q }); - } else { - //Попыток нет - финализируем обработку - await dbConn.setQueueState({ - nQueueId: q.nId, - sExecMsg: q.sExecMsg, - nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR - }); - } - break; - } - //Успешно обработано в БД - case objQueueSchema.NQUEUE_EXEC_STATE_DB_OK: { - //Финализируем - await dbConn.setQueueState({ - nQueueId: q.nId, - sExecMsg: null, - nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK - }); - break; - } - //Обработано с ошибками - case objQueueSchema.NQUEUE_EXEC_STATE_ERR: { - //Ничего не делаем, но предупредим что так быть не должно - await warnBadStateForProcess({ queue: q }); - break; - } - //Обработано успешно - case objQueueSchema.NQUEUE_EXEC_STATE_OK: { - //Ничего не делаем, но предупредим что так быть не должно - await warnBadStateForProcess({ queue: q }); - break; - } - default: { - //Ничего не делаем - break; - } - } - //Отключаемся от БД - if (dbConn) await dbConn.disconnect(); - //Отправляем успех - sendOKResult(); - } catch (e) { - //Отключаемся от БД - if (dbConn) await dbConn.disconnect(); - //Отправляем ошибку - let sErr = `${SERR_UNEXPECTED}: ${e.message}`; - if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`; - sendErrorResult(sErr); - } - //Отправим родителю информацию о том, что закончили обработку - - //} else { - // sendErrorResult({ sMessage: sCheckResult }); - //} -}; - -//--------------------------------- -// Управление процессом обработчика -//--------------------------------- - -//Перехват CTRL + C (останов процесса) -process.on("SIGINT", () => {}); - -//Перехват CTRL + \ (останов процесса) -process.on("SIGQUIT", () => {}); - -//Перехват мягкого останова процесса -process.on("SIGTERM", () => {}); - -//Перехват ошибок -process.on("uncaughtException", e => { - //Отправляем ошибку родительскому процессу - sendErrorResult(e.message); -}); - -//Приём сообщений -process.on("message", task => { - //Проверяем структуру переданного сообщения - /* - let sCheckResult = validateObject( - task, - objOutQueueProcessorSchema.OutQueueProcessorTask, - "Задача обработчика очереди исходящих сообщений" - ); - */ - //Если структура объекта в норме - //if (!sCheckResult) { - //Запускаем обработку - processTask({ task }); - //} else { - // throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); - //} -}); diff --git a/models/obj_out_queue_processor.js b/models/obj_out_queue_processor.js index b22093e..8460e2a 100644 --- a/models/obj_out_queue_processor.js +++ b/models/obj_out_queue_processor.js @@ -8,50 +8,24 @@ //---------------------- const Schema = require("validate"); //Схемы валидации -const { Service } = require("./obj_service"); //Схема валидации сервиса -const { ServiceFunction } = require("./obj_service_function"); //Схема валидации функции сервиса -const { - NQUEUE_EXEC_STATE_INQUEUE, - NQUEUE_EXEC_STATE_APP, - NQUEUE_EXEC_STATE_APP_OK, - NQUEUE_EXEC_STATE_APP_ERR, - NQUEUE_EXEC_STATE_DB, - NQUEUE_EXEC_STATE_DB_OK, - NQUEUE_EXEC_STATE_DB_ERR, - NQUEUE_EXEC_STATE_OK, - NQUEUE_EXEC_STATE_ERR -} = require("./obj_queue"); //Схема валидации сообщения очереди обмена +const { dbConnect } = require("./obj_config"); //Схемы валидации конфигурации сервера приложений -//------------ -// Тело модуля -//------------ +//---------- +// Константы +//---------- -//Валидация данных сообщения очереди -const validateBuffer = val => { - //Либо null - if (val === null) { - return true; - } else { - //Либо данные для формирования Buffer - const s = new Schema({ - type: { - type: String, - required: true - }, - data: { - type: Array, - required: true - } - }); - const errs = s.validate(val, { strip: false }); - return errs.length == 0; - } -}; +//Состояния обработки сообщений очереди обмена +const STASK_RESULT_OK = "OK"; //Обработано успешно +const STASK_RESULT_ERR = "ERR"; //Обработано с ошибками //------------------ // Интерфейс модуля //------------------ +//Константы +exports.STASK_RESULT_OK = STASK_RESULT_OK; +exports.STASK_RESULT_ERR = STASK_RESULT_ERR; + //Схема валидации задачи обработчику очереди исходящих сообщений exports.OutQueueProcessorTask = new Schema({ //Идентификатор записи журнала обмена для обработки @@ -64,121 +38,39 @@ exports.OutQueueProcessorTask = new Schema({ required: path => `Не указан идентификатор записи журнала обмена для обработки (${path})` } }, - //Состояние обработки сообщения очереди обмена - nExecState: { - type: Number, - enum: [ - NQUEUE_EXEC_STATE_INQUEUE, - NQUEUE_EXEC_STATE_APP, - NQUEUE_EXEC_STATE_APP_OK, - NQUEUE_EXEC_STATE_APP_ERR, - NQUEUE_EXEC_STATE_DB, - NQUEUE_EXEC_STATE_DB_OK, - NQUEUE_EXEC_STATE_DB_ERR, - NQUEUE_EXEC_STATE_OK, - NQUEUE_EXEC_STATE_ERR - ], + //Параметры подключения к БД + connectSettings: { + schema: dbConnect, required: true, message: { - type: path => - `Состояние обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, - enum: path => `Значение состояния обработки сообщения очереди обмена (${path}) не поддерживается`, - required: path => `Не указано состояние обработки сообщения очереди обмена (${path})` - } - }, - //Данные сообщения очереди обмена - blMsg: { - use: { validateBuffer }, - required: true, - message: { - validateBuffer: path => - `Данные записи журнала обмена для обработки (${path}) имеют некорректный тип данных (ожидалось - null или {type: String, data: Array})`, - required: path => `Не указаны данные сообщения очереди обмена (${path})` - } - }, - //Данные ответа на сообщение очереди обмена - blResp: { - use: { validateBuffer }, - required: true, - message: { - validateBuffer: path => - `Данные ответа (${path}) имеют некорректный тип данных (ожидалось - null или {type: String, data: Array})`, - required: path => `Не указаны данные ответа (${path})` - } - }, - //Cервис-обработчик - service: { - schema: Service, - required: true, - message: { - required: path => `Не указан сервис для обработки сообщения очереди (${path})` - } - }, - //Функция сервиса-обработчика - function: { - schema: ServiceFunction, - required: true, - message: { - required: path => `Не указана функция сервиса для обработки сообщения очереди (${path})` + required: path => `Не указаны параметры подключения к БД (${path})` } } -}).validator({ - required: val => typeof val != "undefined" }); //Схема валидации ответа обработчика очереди исходящих сообщений exports.OutQueueProcessorTaskResult = new Schema({ //Состояние обработки сообщения очереди обмена - nExecState: { - type: Number, - enum: [ - NQUEUE_EXEC_STATE_INQUEUE, - NQUEUE_EXEC_STATE_APP, - NQUEUE_EXEC_STATE_APP_OK, - NQUEUE_EXEC_STATE_APP_ERR, - NQUEUE_EXEC_STATE_DB, - NQUEUE_EXEC_STATE_DB_OK, - NQUEUE_EXEC_STATE_DB_ERR, - NQUEUE_EXEC_STATE_OK, - NQUEUE_EXEC_STATE_ERR - ], + sResult: { + type: String, + enum: [STASK_RESULT_OK, STASK_RESULT_ERR], required: true, message: { type: path => - `Состояние обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, + `Состояние обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, enum: path => `Значение состояния обработки сообщения очереди обмена (${path}) не поддерживается`, required: path => `Не указано состояние обработки сообщения очереди обмена (${path})` } }, //Информация от обработчика сообщения очереди обмена - sExecMsg: { + sMsg: { type: String, - required: false, + required: true, message: { type: path => `Информация от обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, required: path => `Не указана информация от обработчика сообщения очереди обмена (${path})` } - }, - //Данные сообщения очереди обмена - blMsg: { - use: { validateBuffer }, - required: true, - message: { - validateBuffer: path => - `Данные сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или {type: String, data: Array})`, - required: path => `Не указаны данные сообщения очереди обмена (${path})` - } - }, - //Данные ответа сообщения очереди обмена - blResp: { - use: { validateBuffer }, - required: true, - message: { - validateBuffer: path => - `Данные ответа сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или {type: String, data: Array})`, - required: path => `Не указаны данные ответа сообщения очереди обмена (${path})` - } } }).validator({ required: val => typeof val != "undefined" diff --git a/models/prms_out_queue.js b/models/prms_out_queue.js index b97212a..636f2d0 100644 --- a/models/prms_out_queue.js +++ b/models/prms_out_queue.js @@ -14,6 +14,14 @@ const { Queue } = require("./obj_queue"); //Схема валидации соо const { DBConnector } = require("../core/db_connector"); //Класс взаимодействия в БД const { Logger } = require("../core/logger"); //Класс для протоколирования работы +//------------- +// Тело модуля +//------------- + +const validateChildProcess = val => { + return val["constructor"]["name"] === "ChildProcess"; +}; + //------------------ // Интерфейс модуля //------------------ @@ -25,7 +33,7 @@ exports.OutQueue = new Schema({ schema: outGoing, required: true, message: { - required: "Не указаны параметры обработки очереди исходящих сообщений (outGoing)" + required: path => `Не указаны параметры обработки очереди исходящих сообщений (${path})` } }, //Объект для взаимодействия с БД @@ -33,8 +41,9 @@ exports.OutQueue = new Schema({ type: DBConnector, required: true, message: { - type: "Объект для взаимодействия с БД (dbConn) имеет некорректный тип данных (ожидалось - DBConnector)", - required: "Не указан объект для взаимодействия с БД (dbConn)" + type: path => + `Объект для взаимодействия с БД (${path}) имеет некорректный тип данных (ожидалось - DBConnector)`, + required: path => `Не указан объект для взаимодействия с БД (${path})` } }, //Объект для протоколирования работы @@ -42,32 +51,94 @@ exports.OutQueue = new Schema({ type: Logger, required: true, message: { - type: "Объект для протоколирования работы (logger) имеет некорректный тип данных (ожидалось - Logger)", - required: "Не указаны объект для протоколирования работы (logger)" + type: path => + `Объект для протоколирования работы (${path}) имеет некорректный тип данных (ожидалось - Logger)`, + required: path => `Не указаны объект для протоколирования работы (${path})` } } }); -//Схема валидации параметров функции установки финальных статусов сообщения в БД -exports.finalise = new Schema({ - //Обрабатываемое исходящее сообщение - queue: { - schema: Queue, +//Схема валидации параметров функции добавления идентификатора сообщения очереди в список обрабатываемых +exports.addInProgress = new Schema({ + //Идентификатор сообщения + nQueueId: { + type: Number, required: true, message: { - required: "Не указано обрабатываемое исходящее сообщение (queue)" + type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указан идентификатор сообщения (${path})` } } }); -//Схема валидации параметров функции запуска обработчика БД -exports.dbProcess = new Schema({ - //Обрабатываемое исходящее сообщение - queue: { - schema: Queue, +//Схема валидации параметров функции удаления идентификатора сообщения очереди из списка обрабатываемых +exports.rmInProgress = new Schema({ + //Идентификатор сообщения + nQueueId: { + type: Number, required: true, message: { - required: "Не указано обрабатываемое исходящее сообщение (queue)" + type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указан идентификатор сообщения (${path})` + } + } +}); + +//Схема валидации параметров функции проверки наличия идентификатора сообщения очереди в списке обрабатываемых +exports.isInProgress = new Schema({ + //Идентификатор сообщения + nQueueId: { + type: Number, + required: true, + message: { + type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указан идентификатор сообщения (${path})` + } + } +}); + +//Схема валидации параметров функции запуска обработчика сообщения очереди +exports.startQueueProcessor = new Schema({ + //Идентификатор сообщения + nQueueId: { + type: Number, + required: true, + message: { + type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указан идентификатор сообщения (${path})` + } + }, + //Процесс обработчика + proc: { + use: { validateChildProcess }, + required: true, + message: { + validateChildProcess: path => + `Процесс обработчика (${path}) имеет некорректный тип данных (ожидалось - ChildProcess)`, + required: path => `Не указан процесс обработчика (${path})` + } + } +}); + +//Схема валидации параметров функции останова обработчика сообщения очереди +exports.stopQueueProcessor = new Schema({ + //Идентификатор сообщения + nQueueId: { + type: Number, + required: true, + message: { + type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указан идентификатор сообщения (${path})` + } + }, + //Процесс обработчика + proc: { + use: { validateChildProcess }, + required: true, + message: { + validateChildProcess: path => + `Процесс обработчика (${path}) имеет некорректный тип данных (ожидалось - ChildProcess)`, + required: path => `Не указан процесс обработчика (${path})` } } }); @@ -79,7 +150,7 @@ exports.processMessage = new Schema({ schema: Queue, required: true, message: { - required: "Не указано обрабатываемое исходящее сообщение (queue)" + required: path => `Не указано обрабатываемое исходящее сообщение (${path})` } } }); diff --git a/models/prms_out_queue_processor.js b/models/prms_out_queue_processor.js index 34fbb96..63ca205 100644 --- a/models/prms_out_queue_processor.js +++ b/models/prms_out_queue_processor.js @@ -8,23 +8,9 @@ //---------------------- const Schema = require("validate"); //Схемы валидации +const { Queue } = require("./obj_queue"); //Схема валидации позиции очереди const { OutQueueProcessorTask } = require("./obj_out_queue_processor"); //Схемы валидации объектов обработчика исходящих сообщений -//------------ -// Тело модуля -//------------ - -//Валидация данных сообщения очереди -const validateBuffer = val => { - //Либо null - if (val === null) { - return true; - } else { - //Либо Buffer - return val instanceof Buffer; - } -}; - //------------------ // Интерфейс модуля //------------------ @@ -42,38 +28,26 @@ exports.sendErrorResult = new Schema({ } }); -//Схема валидации параметров функции отправки успеха обработки -exports.sendOKResult = new Schema({ - //Данные сообщения очереди обмена - blMsg: { - use: { validateBuffer }, +//Схема валидации параметров функции обработчки сообщения сервером приложений +exports.appProcess = new Schema({ + //Обрабатываемое сообщение очереди + queue: { + schema: Queue, required: true, message: { - validateBuffer: path => - `Данные сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или Buffer)`, - required: path => `Не указаны данные сообщения очереди обмена (${path})` - } - }, - //Данные ответа сообщения очереди обмена - blResp: { - use: { validateBuffer }, - required: true, - message: { - validateBuffer: path => - `Данные ответа сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или Buffer)`, - required: path => `Не указаны данные ответа сообщения очереди обмена (${path})` + required: path => `Не указано обрабатываемое сообщение очреди (${path})` } } -}).validator({ required: val => typeof val != "undefined" }); +}); -//Параметры функции отправки сообщения родителю без обработки -exports.sendUnChange = new Schema({ - //Задача обработки - task: { - schema: OutQueueProcessorTask, +//Схема валидации параметров функции обработчки сообщения сервером БД +exports.dbProcess = new Schema({ + //Обрабатываемое сообщение очереди + queue: { + schema: Queue, required: true, message: { - required: path => `Не указана задача для обработки (${path})` + required: path => `Не указано обрабатываемое сообщение очреди (${path})` } } });