From ca445a40837525bafb66ce509bedb3db9470e95d Mon Sep 17 00:00:00 2001 From: Mikhail Chechnev Date: Mon, 24 Dec 2018 14:00:24 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9C=D0=BE=D0=B4=D0=B8=D1=84=D0=B8=D1=86?= =?UTF-8?q?=D0=B8=D1=80=D0=BE=D0=B2=D0=B0=D0=BD=20=D0=B0=D0=BB=D0=B3=D0=BE?= =?UTF-8?q?=D1=80=D0=B8=D1=82=D0=BC=20=D0=B4=D0=B2=D0=B8=D0=B6=D0=B5=D0=BD?= =?UTF-8?q?=D0=B8=D1=8F=20=D0=BE=D0=B1=D1=80=D0=B0=D0=B1=D0=B0=D1=82=D1=8B?= =?UTF-8?q?=D0=B2=D0=B0=D0=B5=D0=BC=D0=BE=D0=B9=20=D0=BF=D0=BE=D0=B7=D0=B8?= =?UTF-8?q?=D1=86=D0=B8=D0=B8=20=D0=BE=D1=87=D0=B5=D1=80=D0=B5=D0=B4=D0=B8?= =?UTF-8?q?=20=D0=BF=D0=BE=20=D1=81=D1=82=D0=B0=D1=82=D1=83=D1=81=D0=BD?= =?UTF-8?q?=D0=BE=D0=B9=20=D0=BC=D0=BE=D0=B4=D0=B5=D0=BB=D0=B8=20-=20?= =?UTF-8?q?=D1=81=D1=82=D0=B0=D0=BB=D0=BE=20=D1=83=D1=81=D1=82=D0=BE=D0=B9?= =?UTF-8?q?=D1=87=D0=B8=D0=B2=D0=B5=D0=B9=20=D0=B2=20=D1=81=D0=BB=D1=83?= =?UTF-8?q?=D1=87=D0=B0=D0=B5=20=D0=B2=D0=BE=D0=B7=D0=BD=D0=B8=D0=BA=D0=BD?= =?UTF-8?q?=D0=BE=D0=B2=D0=B5=D0=BD=D0=B8=D1=8F=20=D0=BD=D0=B5=D0=BE=D0=B6?= =?UTF-8?q?=D0=B8=D0=B4=D0=B0=D0=BD=D0=BD=D1=8B=D1=85=20=D0=BE=D1=82=D0=B2?= =?UTF-8?q?=D0=B5=D1=82=D0=BE=D0=B2=20=D0=BE=D0=B1=D1=80=D0=B0=D0=BE=D1=82?= =?UTF-8?q?=D1=87=D0=B8=D0=BA=D0=B0=20(=D1=82=D0=B5=D0=BF=D0=B5=D1=80?= =?UTF-8?q?=D1=8C=20=D0=B0=D0=BD=D0=B0=D0=BB=D0=B8=D0=B7=D0=B8=D1=80=D1=83?= =?UTF-8?q?=D0=B5=D0=BC=20=D0=BC=D0=B5=D0=BD=D1=8F=D0=BB=D0=BE=D1=81=D1=8C?= =?UTF-8?q?=20=D0=BB=D0=B8=20=D0=BA=D0=BE=D0=BB=D0=B8=D1=87=D0=B5=D1=81?= =?UTF-8?q?=D1=82=D0=B2=D0=BE=20=D0=BF=D0=BE=D0=BF=D1=8B=D1=82=D0=BE=D0=BA?= =?UTF-8?q?=20=D0=B8=D1=81=D0=BF=D0=BE=D0=BB=D0=BD=D0=B5=D0=BD=D0=B8=D1=8F?= =?UTF-8?q?=20=D1=81=20=D0=BC=D0=BE=D0=BC=D0=B5=D0=BD=D1=82=D0=B0=20=D0=B7?= =?UTF-8?q?=D0=B0=D0=BF=D1=83=D1=81=D0=BA=D0=B0=20=D0=BE=D0=B1=D1=80=D0=B0?= =?UTF-8?q?=D0=B1=D0=BE=D1=82=D1=87=D0=B8=D0=BA=D0=B0=20=D0=B8=20=D0=BF?= =?UTF-8?q?=D0=B5=D1=80=D0=B5=D1=87=D0=B8=D1=82=D1=8B=D0=B2=D0=B0=D0=B5?= =?UTF-8?q?=D0=BC=20=D0=BD=D0=BE=D0=B2=D0=BE=D0=B5=20=D1=81=D0=BE=D1=81?= =?UTF-8?q?=D1=82=D0=BE=D1=8F=D0=BD=D0=B8=D0=B5=20=D0=BF=D0=BE=D0=B7=D0=B8?= =?UTF-8?q?=D1=86=D0=B8=D0=B8=20=D0=BE=D1=87=D0=B5=D1=80=D0=B5=D0=B4=D0=B8?= =?UTF-8?q?=20=D0=BF=D0=BE=20=D0=B7=D0=B0=D0=B2=D0=B5=D1=80=D1=88=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D1=8E=20=D0=BE=D0=B1=D1=80=D0=B0=D0=B1=D0=BE=D1=82?= =?UTF-8?q?=D1=87=D0=B8=D0=BA=D0=B0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/out_queue.js | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/core/out_queue.js b/core/out_queue.js index 2b48ca6..415bbd5 100644 --- a/core/out_queue.js +++ b/core/out_queue.js @@ -13,7 +13,7 @@ const ChildProcess = require("child_process"); //Работа с дочерни const { ServerError } = require("./server_errors"); //Типовая ошибка const { SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции -const { NINC_EXEC_CNT_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД +const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди const prmsOutQueueSchema = require("../models/prms_out_queue"); //Схемы валидации параметров функций класса @@ -193,12 +193,14 @@ class OutQueue extends EventEmitter { if (this.nWorkersLeft > 0) { //Переопределим себя для обращения внутри обработчиков событий const self = this; + //Запоминаем текущее количество попыток обработки + const nQueueOldExecCnt = prms.queue.nExecCnt; //Создаём новый обработчик сообщений const proc = ChildProcess.fork("core/out_queue_processor", { silent: false }); - //Текущее состояние сообщения - let curQueue = null; //Перехват сообщений обработчика proc.on("message", async result => { + //Считываем сообщение изменённое обработчиком + prms.queue = await self.dbConn.getQueue({ nQueueId: prms.queue.nId }); //Проверяем структуру полученного сообщения let sCheckResult = validateObject( result, @@ -207,7 +209,8 @@ class OutQueue extends EventEmitter { ); //Если структура сообщения в норме if (!sCheckResult) { - if (result.sResult == "ERR") { + //Анализируем результат обработки + if (result.sResult == objOutQueueProcessorSchema.STASK_RESULT_ERR) { //Фиксируем ошибку обработки - протокол работы сервиса await self.logger.error(`Ошибка обработки исходящего сообщения: ${result.sMsg}`, { nQueueId: prms.queue.nId @@ -216,13 +219,17 @@ class OutQueue extends EventEmitter { await this.dbConn.setQueueState({ nQueueId: prms.queue.nId, sExecMsg: result.sMsg, - nIncExecCnt: NINC_EXEC_CNT_YES, + nIncExecCnt: + nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, nExecState: - prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts + (nQueueOldExecCnt == prms.queue.nExecCnt + ? prms.queue.nExecCnt + 1 + : prms.queue.nExecCnt) < prms.queue.nRetryAttempts ? prms.queue.nExecState : objQueueSchema.NQUEUE_EXEC_STATE_ERR }); } + //Если есть контекст для сервиса - сохраним его для дальнейшего использования } else { //Пришел неожиданный ответ обработчика - запись в протокол работы сервиса await self.logger.error( @@ -233,9 +240,11 @@ class OutQueue extends EventEmitter { await this.dbConn.setQueueState({ nQueueId: prms.queue.nId, sExecMsg: `Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`, - nIncExecCnt: NINC_EXEC_CNT_YES, + nIncExecCnt: nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, nExecState: - prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts + (nQueueOldExecCnt == prms.queue.nExecCnt + ? prms.queue.nExecCnt + 1 + : prms.queue.nExecCnt) < prms.queue.nRetryAttempts ? prms.queue.nExecState : objQueueSchema.NQUEUE_EXEC_STATE_ERR }); @@ -253,6 +262,8 @@ class OutQueue extends EventEmitter { }); //Перехват ошибок обработчика proc.on("error", async e => { + //Считываем сообщение изменённое обработчиком + prms.queue = await self.dbConn.getQueue({ nQueueId: prms.queue.nId }); //Фиксируем ошибку в протоколе работы await self.logger.error(`Ошибка обработки исходящего сообщения: ${makeErrorText(e)}`, { nQueueId: prms.queue.nId @@ -261,9 +272,10 @@ class OutQueue extends EventEmitter { await this.dbConn.setQueueState({ nQueueId: prms.queue.nId, sExecMsg: makeErrorText(e), - nIncExecCnt: NINC_EXEC_CNT_YES, + nIncExecCnt: nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, nExecState: - prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts + (nQueueOldExecCnt == prms.queue.nExecCnt ? prms.queue.nExecCnt + 1 : prms.queue.nExecCnt) < + prms.queue.nRetryAttempts ? prms.queue.nExecState : objQueueSchema.NQUEUE_EXEC_STATE_ERR }); @@ -374,7 +386,7 @@ class OutQueue extends EventEmitter { } //Ждем завершения работы всех обработчиков let i = setInterval(() => { - if (!this.bWorking && this.nWorkersLeft == this.outGoing.nMaxWorkers) { + if (!this.bWorking && this.nWorkersLeft >= this.outGoing.nMaxWorkers) { clearInterval(i); this.notifyStopped(); }