diff --git a/core/out_queue.js b/core/out_queue.js index 661bcce..18f9154 100644 --- a/core/out_queue.js +++ b/core/out_queue.js @@ -143,11 +143,15 @@ class OutQueue extends EventEmitter { //Если структура объекта в норме if (!sCheckResult) { //Добавляем идентификатор позиции очереди в список обрабатываемых - this.addInProgress({ nQueueId: prms.nQueueId }); + this.addInProgress({ nQueueId: prms.queue.nId }); //Отдаём команду дочернему процессу обработчика на старт исполнения prms.proc.send({ - nQueueId: prms.nQueueId, - connectSettings: this.dbConn.connectSettings + nQueueId: prms.queue.nId, + connectSettings: this.dbConn.connectSettings, + service: _.find(this.services, { nId: prms.queue.nServiceId }), + function: _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, { + nId: prms.queue.nServiceFnId + }) }); //Уменьшаем количество доступных обработчиков this.nWorkersLeft--; @@ -277,7 +281,7 @@ class OutQueue extends EventEmitter { //Перехват останова обработчика proc.on("exit", code => {}); //Запускаем обработчик - this.startQueueProcessor({ nQueueId: prms.queue.nId, proc }); + this.startQueueProcessor({ queue: prms.queue, proc }); } } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 3827e89..57559e2 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -90,29 +90,23 @@ const appProcess = async prms => { }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, { nQueueId: prms.queue.nId } ); - if (prms.queue.blMsg) { - let sMsg = prms.queue.blMsg.toString() + " MODIFICATION FOR " + prms.queue.nId; - //Фиксируем успех исполнения - newQueue = await dbConn.setQueueAppSrvResult({ - nQueueId: prms.queue.nId, - blMsg: new Buffer(sMsg), - blResp: new Buffer("REPLAY ON " + prms.queue.nId) - }); - //Фиксируем успешное исполнение сервером приложений - в статусе сообщения - newQueue = await dbConn.setQueueState({ - nQueueId: prms.queue.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK - }); - //Фиксируем успешное исполнение сервером приложений - в протоколе работы сервиса - await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, { - nQueueId: prms.queue.nId - }); - } else { - throw new ServerError( - SERR_UNEXPECTED, - `Ошибка отработки сообщения ${prms.queue.nId}: нет данных для обработки` - ); - } + let sMsg = + (prms.queue.blMsg ? prms.queue.blMsg.toString() : "null") + " MODIFICATION FOR " + prms.queue.nId; + //Фиксируем успех исполнения + newQueue = await dbConn.setQueueAppSrvResult({ + nQueueId: prms.queue.nId, + blMsg: new Buffer(sMsg), + blResp: new Buffer("REPLAY ON " + prms.queue.nId) + }); + //Фиксируем успешное исполнение сервером приложений - в статусе сообщения + newQueue = await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK + }); + //Фиксируем успешное исполнение сервером приложений - в протоколе работы сервиса + await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, { + nQueueId: prms.queue.nId + }); } catch (e) { //Фиксируем ошибку обработки сервером приложений - в статусе сообщения newQueue = await dbConn.setQueueState({ @@ -228,7 +222,11 @@ const processTask = async prms => { case objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE: { //Запускаем обработку сервером приложений try { - let res = await appProcess({ queue: q }); + let res = await appProcess({ + queue: q, + service: prms.task.service, + function: prms.task.function + }); //И если она успешно завершилась - обработку сервером БД if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { try { @@ -284,7 +282,11 @@ const processTask = async prms => { if (q.nExecCnt < q.nRetryAttempts) { //Снова запускаем обработку сервером приложений try { - let res = await appProcess({ queue: q }); + let res = await appProcess({ + queue: q, + service: prms.task.service, + function: prms.task.function + }); //И если она успешно завершилась - обработку сервоером БД if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { try { diff --git a/models/obj_out_queue_processor.js b/models/obj_out_queue_processor.js index 8460e2a..cad174f 100644 --- a/models/obj_out_queue_processor.js +++ b/models/obj_out_queue_processor.js @@ -9,6 +9,8 @@ const Schema = require("validate"); //Схемы валидации const { dbConnect } = require("./obj_config"); //Схемы валидации конфигурации сервера приложений +const { Service } = require("./obj_service"); //Схема валидации сервиса +const { ServiceFunction } = require("./obj_service_function"); //Схема валидации функции сервиса //---------- // Константы @@ -45,6 +47,22 @@ exports.OutQueueProcessorTask = new Schema({ message: { required: path => `Не указаны параметры подключения к БД (${path})` } + }, + //Cервис-обработчик + service: { + schema: Service, + required: true, + message: { + required: path => `Не указан сервис для обработки сообщения очереди (${path})` + } + }, + //Функция сервиса-обработчика + function: { + schema: ServiceFunction, + required: true, + message: { + required: path => `Не указана функция сервиса для обработки сообщения очереди (${path})` + } } }); diff --git a/models/prms_out_queue.js b/models/prms_out_queue.js index 636f2d0..4c9188f 100644 --- a/models/prms_out_queue.js +++ b/models/prms_out_queue.js @@ -99,13 +99,12 @@ exports.isInProgress = new Schema({ //Схема валидации параметров функции запуска обработчика сообщения очереди exports.startQueueProcessor = new Schema({ - //Идентификатор сообщения - nQueueId: { - type: Number, + //Обрабатываемое сообщение очереди + queue: { + schema: Queue, required: true, message: { - type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`, - required: path => `Не указан идентификатор сообщения (${path})` + required: path => `Не указано обрабатываемое сообщение очреди (${path})` } }, //Процесс обработчика diff --git a/models/prms_out_queue_processor.js b/models/prms_out_queue_processor.js index 63ca205..11788be 100644 --- a/models/prms_out_queue_processor.js +++ b/models/prms_out_queue_processor.js @@ -10,6 +10,8 @@ const Schema = require("validate"); //Схемы валидации const { Queue } = require("./obj_queue"); //Схема валидации позиции очереди const { OutQueueProcessorTask } = require("./obj_out_queue_processor"); //Схемы валидации объектов обработчика исходящих сообщений +const { Service } = require("./obj_service"); //Схема валидации сервиса +const { ServiceFunction } = require("./obj_service_function"); //Схема валидации функции сервиса //------------------ // Интерфейс модуля @@ -37,6 +39,22 @@ exports.appProcess = new Schema({ message: { required: path => `Не указано обрабатываемое сообщение очреди (${path})` } + }, + //Cервис-обработчик + service: { + schema: Service, + required: true, + message: { + required: path => `Не указан сервис для обработки сообщения очереди (${path})` + } + }, + //Функция сервиса-обработчика + function: { + schema: ServiceFunction, + required: true, + message: { + required: path => `Не указана функция сервиса для обработки сообщения очереди (${path})` + } } });