From 8fe4f979f910b225d1bc65684049f95a211dc04e Mon Sep 17 00:00:00 2001 From: Mikhail Chechnev Date: Fri, 7 Dec 2018 18:24:05 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9F=D0=B5=D1=80=D0=B5=D0=B4=D0=B0=D1=87?= =?UTF-8?q?=D0=B0=20=D0=BE=D0=BF=D0=B8=D1=81=D0=B0=D0=BD=D0=B8=D1=8F=20?= =?UTF-8?q?=D1=81=D0=B5=D1=80=D0=B2=D0=B8=D1=81=D0=B0=20=D0=B8=20=D1=84?= =?UTF-8?q?=D1=83=D0=BD=D0=BA=D1=86=D0=B8=D0=B8=20=D0=BE=D0=B1=D1=80=D0=B0?= =?UTF-8?q?=D0=B1=D0=BE=D1=82=D1=87=D0=B8=D0=BA=D1=83=20=D1=81=D0=BE=D0=BE?= =?UTF-8?q?=D0=B1=D1=89=D0=B5=D0=BD=D0=B8=D1=8F=20=D0=BE=D1=87=D0=B5=D1=80?= =?UTF-8?q?=D0=B5=D0=B4=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/out_queue.js | 12 ++++--- core/out_queue_processor.js | 52 ++++++++++++++++-------------- models/obj_out_queue_processor.js | 18 +++++++++++ models/prms_out_queue.js | 9 +++--- models/prms_out_queue_processor.js | 18 +++++++++++ 5 files changed, 75 insertions(+), 34 deletions(-) diff --git a/core/out_queue.js b/core/out_queue.js index 661bcce..18f9154 100644 --- a/core/out_queue.js +++ b/core/out_queue.js @@ -143,11 +143,15 @@ class OutQueue extends EventEmitter { //Если структура объекта в норме if (!sCheckResult) { //Добавляем идентификатор позиции очереди в список обрабатываемых - this.addInProgress({ nQueueId: prms.nQueueId }); + this.addInProgress({ nQueueId: prms.queue.nId }); //Отдаём команду дочернему процессу обработчика на старт исполнения prms.proc.send({ - nQueueId: prms.nQueueId, - connectSettings: this.dbConn.connectSettings + nQueueId: prms.queue.nId, + connectSettings: this.dbConn.connectSettings, + service: _.find(this.services, { nId: prms.queue.nServiceId }), + function: _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, { + nId: prms.queue.nServiceFnId + }) }); //Уменьшаем количество доступных обработчиков this.nWorkersLeft--; @@ -277,7 +281,7 @@ class OutQueue extends EventEmitter { //Перехват останова обработчика proc.on("exit", code => {}); //Запускаем обработчик - this.startQueueProcessor({ nQueueId: prms.queue.nId, proc }); + this.startQueueProcessor({ queue: prms.queue, proc }); } } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 3827e89..57559e2 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -90,29 +90,23 @@ const appProcess = async prms => { }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, { nQueueId: prms.queue.nId } ); - if (prms.queue.blMsg) { - let sMsg = prms.queue.blMsg.toString() + " MODIFICATION FOR " + prms.queue.nId; - //Фиксируем успех исполнения - newQueue = await dbConn.setQueueAppSrvResult({ - nQueueId: prms.queue.nId, - blMsg: new Buffer(sMsg), - blResp: new Buffer("REPLAY ON " + prms.queue.nId) - }); - //Фиксируем успешное исполнение сервером приложений - в статусе сообщения - newQueue = await dbConn.setQueueState({ - nQueueId: prms.queue.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK - }); - //Фиксируем успешное исполнение сервером приложений - в протоколе работы сервиса - await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, { - nQueueId: prms.queue.nId - }); - } else { - throw new ServerError( - SERR_UNEXPECTED, - `Ошибка отработки сообщения ${prms.queue.nId}: нет данных для обработки` - ); - } + let sMsg = + (prms.queue.blMsg ? prms.queue.blMsg.toString() : "null") + " MODIFICATION FOR " + prms.queue.nId; + //Фиксируем успех исполнения + newQueue = await dbConn.setQueueAppSrvResult({ + nQueueId: prms.queue.nId, + blMsg: new Buffer(sMsg), + blResp: new Buffer("REPLAY ON " + prms.queue.nId) + }); + //Фиксируем успешное исполнение сервером приложений - в статусе сообщения + newQueue = await dbConn.setQueueState({ + nQueueId: prms.queue.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK + }); + //Фиксируем успешное исполнение сервером приложений - в протоколе работы сервиса + await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, { + nQueueId: prms.queue.nId + }); } catch (e) { //Фиксируем ошибку обработки сервером приложений - в статусе сообщения newQueue = await dbConn.setQueueState({ @@ -228,7 +222,11 @@ const processTask = async prms => { case objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE: { //Запускаем обработку сервером приложений try { - let res = await appProcess({ queue: q }); + let res = await appProcess({ + queue: q, + service: prms.task.service, + function: prms.task.function + }); //И если она успешно завершилась - обработку сервером БД if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { try { @@ -284,7 +282,11 @@ const processTask = async prms => { if (q.nExecCnt < q.nRetryAttempts) { //Снова запускаем обработку сервером приложений try { - let res = await appProcess({ queue: q }); + let res = await appProcess({ + queue: q, + service: prms.task.service, + function: prms.task.function + }); //И если она успешно завершилась - обработку сервоером БД if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { try { diff --git a/models/obj_out_queue_processor.js b/models/obj_out_queue_processor.js index 8460e2a..cad174f 100644 --- a/models/obj_out_queue_processor.js +++ b/models/obj_out_queue_processor.js @@ -9,6 +9,8 @@ const Schema = require("validate"); //Схемы валидации const { dbConnect } = require("./obj_config"); //Схемы валидации конфигурации сервера приложений +const { Service } = require("./obj_service"); //Схема валидации сервиса +const { ServiceFunction } = require("./obj_service_function"); //Схема валидации функции сервиса //---------- // Константы @@ -45,6 +47,22 @@ exports.OutQueueProcessorTask = new Schema({ message: { required: path => `Не указаны параметры подключения к БД (${path})` } + }, + //Cервис-обработчик + service: { + schema: Service, + required: true, + message: { + required: path => `Не указан сервис для обработки сообщения очереди (${path})` + } + }, + //Функция сервиса-обработчика + function: { + schema: ServiceFunction, + required: true, + message: { + required: path => `Не указана функция сервиса для обработки сообщения очереди (${path})` + } } }); diff --git a/models/prms_out_queue.js b/models/prms_out_queue.js index 636f2d0..4c9188f 100644 --- a/models/prms_out_queue.js +++ b/models/prms_out_queue.js @@ -99,13 +99,12 @@ exports.isInProgress = new Schema({ //Схема валидации параметров функции запуска обработчика сообщения очереди exports.startQueueProcessor = new Schema({ - //Идентификатор сообщения - nQueueId: { - type: Number, + //Обрабатываемое сообщение очереди + queue: { + schema: Queue, required: true, message: { - type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`, - required: path => `Не указан идентификатор сообщения (${path})` + required: path => `Не указано обрабатываемое сообщение очреди (${path})` } }, //Процесс обработчика diff --git a/models/prms_out_queue_processor.js b/models/prms_out_queue_processor.js index 63ca205..11788be 100644 --- a/models/prms_out_queue_processor.js +++ b/models/prms_out_queue_processor.js @@ -10,6 +10,8 @@ const Schema = require("validate"); //Схемы валидации const { Queue } = require("./obj_queue"); //Схема валидации позиции очереди const { OutQueueProcessorTask } = require("./obj_out_queue_processor"); //Схемы валидации объектов обработчика исходящих сообщений +const { Service } = require("./obj_service"); //Схема валидации сервиса +const { ServiceFunction } = require("./obj_service_function"); //Схема валидации функции сервиса //------------------ // Интерфейс модуля @@ -37,6 +39,22 @@ exports.appProcess = new Schema({ message: { required: path => `Не указано обрабатываемое сообщение очреди (${path})` } + }, + //Cервис-обработчик + service: { + schema: Service, + required: true, + message: { + required: path => `Не указан сервис для обработки сообщения очереди (${path})` + } + }, + //Функция сервиса-обработчика + function: { + schema: ServiceFunction, + required: true, + message: { + required: path => `Не указана функция сервиса для обработки сообщения очереди (${path})` + } } });