From 2e789aeea7c11111ea103f9ac4e10eaa4056ee73 Mon Sep 17 00:00:00 2001 From: Mikhail Chechnev Date: Sun, 2 Dec 2018 01:32:36 +0300 Subject: [PATCH] =?UTF-8?q?=D0=92=D0=B0=D0=BB=D0=B8=D0=B4=D0=B0=D1=86?= =?UTF-8?q?=D0=B8=D1=8F=20=D0=BF=D0=B0=D1=80=D0=B0=D0=BC=D0=B5=D1=82=D1=80?= =?UTF-8?q?=D0=BE=D0=B2=20=D0=B4=D0=BB=D1=8F=20=D1=84=D1=83=D0=BD=D0=BA?= =?UTF-8?q?=D1=86=D0=B8=D0=B8=20=D0=B7=D0=B0=D0=BF=D1=83=D1=81=D0=BA=D0=B0?= =?UTF-8?q?=20=D0=BE=D0=B1=D1=81=D0=BB=D1=83=D0=B6=D0=B8=D0=B2=D0=B0=D0=BD?= =?UTF-8?q?=D0=B8=D1=8F=20=D0=BE=D1=87=D0=B5=D1=80=D0=B5=D0=B4=D0=B8=20?= =?UTF-8?q?=D0=B8=D1=81=D1=85=D0=BE=D0=B4=D1=8F=D1=89=D0=B8=D1=85=20(start?= =?UTF-8?q?Processing),=20=D1=84=D1=83=D0=BD=D0=BA=D1=86=D0=B8=D0=B8=20?= =?UTF-8?q?=D0=BE=D1=82=D0=BF=D1=80=D0=B0=D0=B2=D0=BA=D0=B8=20=D0=BE=D1=87?= =?UTF-8?q?=D0=B5=D1=80=D0=B5=D0=B4=D0=BD=D0=BE=D0=B3=D0=BE=20=D1=81=D0=BE?= =?UTF-8?q?=D0=BE=D0=B1=D1=89=D0=B5=D0=BD=D0=B8=D1=8F=20=D0=BD=D0=B0=20?= =?UTF-8?q?=D0=BE=D0=B1=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D0=BA=D1=83=20(proce?= =?UTF-8?q?ssMessage),=20=D0=B8=D1=81=D0=BF=D1=80=D0=B0=D0=B2=D0=BB=D0=B5?= =?UTF-8?q?=D0=BD=D0=B0=20=D0=BE=D1=88=D0=B8=D0=B1=D0=BA=D0=B0=20=D0=B2?= =?UTF-8?q?=D0=B0=D0=BB=D0=B8=D0=B4=D0=B0=D1=86=D0=B8=D0=B8=20=D0=BF=D0=B0?= =?UTF-8?q?=D1=80=D0=B0=D0=BC=D0=B5=D1=82=D1=80=D0=BE=D0=B2=20=D0=B8=D0=BD?= =?UTF-8?q?=D0=B8=D1=86=D0=B8=D0=B0=D0=BB=D0=B8=D0=B7=D0=B0=D1=86=D0=B8?= =?UTF-8?q?=D0=B8=20(outGoing)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/out_queue.js | 222 ++++++++++++++++++++++----------------- models/prms_out_queue.js | 24 ++++- 2 files changed, 146 insertions(+), 100 deletions(-) diff --git a/core/out_queue.js b/core/out_queue.js index c6b6807..00bc4d5 100644 --- a/core/out_queue.js +++ b/core/out_queue.js @@ -13,7 +13,7 @@ const ChildProcess = require("child_process"); //Работа с дочерни const { ServerError } = require("./server_errors"); //Типовая ошибка const { SERR_UNEXPECTED, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы const { validateObject } = require("./utils"); //Вспомогательные функции -const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД +const { NINC_EXEC_CNT_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди const prmsOutQueueSchema = require("../models/prms_out_queue"); //Схемы валидации параметров функций класса @@ -46,6 +46,8 @@ class OutQueue extends EventEmitter { let sCheckResult = validateObject(prms, prmsOutQueueSchema.OutQueue, "Параметры конструктора класса OutQueue"); //Если структура объекта в норме if (!sCheckResult) { + //Список обслуживаемых сервисов + this.services = null; //Признак функционирования обработчика this.bWorking = false; //Параметры очереди @@ -79,108 +81,118 @@ class OutQueue extends EventEmitter { worker.kill(); this.nWorkersLeft++; } - //Запуск обработчки очередного сообщения - processMessage(message) { - //Проверим, что есть доступные обработчики - if (this.nWorkersLeft > 0) { - //Переопределим себя для обращения внутри обработчиков событий - const self = this; - //Создаём новый обработчик сообщений - const proc = ChildProcess.fork("core/out_queue_processor", { silent: false }); - //Перехват сообщений обработчика - proc.on("message", async result => { - //Проверяем структуру полученного сообщения - let sCheckResult = validateObject( - result, - objOutQueueProcessorSchema.OutQueueProcessorTaskResult, - "Ответ обработчика очереди исходящих сообщений" - ); - //Если структура сообщения в норме - if (!sCheckResult) { - //Если обработчик вернул ошибку - if (result.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR) { - //Установим ошибочный статус в БД для сообщений и увеличим счетчик попыток отправки - console.log("ЖДЕМ ЗАПИСЬ В БД111"); + //Запуск обработки очередного сообщения + processMessage(prms) { + //Проверяем структуру переданного объекта для старта + let sCheckResult = validateObject( + prms, + prmsOutQueueSchema.processMessage, + "Параметры функции запуска обработки очередного сообщения" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Проверим, что есть доступные обработчики + if (this.nWorkersLeft > 0) { + //Переопределим себя для обращения внутри обработчиков событий + const self = this; + //Создаём новый обработчик сообщений + const proc = ChildProcess.fork("core/out_queue_processor", { silent: false }); + //Перехват сообщений обработчика + proc.on("message", async result => { + //Проверяем структуру полученного сообщения + let sCheckResult = validateObject( + result, + objOutQueueProcessorSchema.OutQueueProcessorTaskResult, + "Ответ обработчика очереди исходящих сообщений" + ); + //Если структура сообщения в норме + if (!sCheckResult) { + //Если обработчик вернул ошибку + if (result.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR) { + //Установим ошибочный статус в БД для сообщений и увеличим счетчик попыток отправки + await self.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: result.sExecMsg, + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: result.nExecState + }); + //Фиксируем ошибку в протоколе работы сервиса + await self.logger.error( + `Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ` + + result.sExecMsg, + { + nQueueId: prms.queue.nId + } + ); + } else { + //Пишем в базу успех + await self.dbConn.setQueueState({ + nQueueId: prms.queue.nId, + sExecMsg: result.sExecMsg, + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: result.nExecState + }); + //Фиксируем успех в протоколе работы сервиса + await self.logger.info( + `Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, + { + nQueueId: prms.queue.nId + } + ); + } + } else { + //Пришел неожиданный ответ обработчика, установим статус в БД - ошибка обработки сервером приложений await self.dbConn.setQueueState({ - nQueueId: message.nId, - sExecMsg: result.sExecMsg, + nQueueId: prms.queue.nId, + sExecMsg: sCheckResult, nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: result.nExecState + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR }); //Фиксируем ошибку в протоколе работы сервиса await self.logger.error( - "Ошибка обработки исходящего сообщения сервером приложений: " + result.sExecMsg, + `Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`, { - nQueueId: message.nId + nQueueId: prms.queue.nId } ); - console.log("ДОЖДАЛИСЬ111"); - } else { - //Пишем в базу успех - console.log("ЖДЕМ ЗАПИСЬ В БД222"); - await self.dbConn.setQueueState({ - nQueueId: message.nId, - sExecMsg: result.sExecMsg, - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: result.nExecState - }); - //Фиксируем успех в протоколе работы сервиса - await self.logger.info("Исходящее сообщение успешно отработано сервером приложений", { - nQueueId: message.nId - }); - console.log("ДОЖДАЛИСЬ222"); } - } else { - //Пришел неожиданный ответ обработчика, установим статус в БД - ошибка обработки сервером приложений - console.log("ЖДЕМ ЗАПИСЬ В БД"); + self.stopMessageWorker(proc); + }); + //Перехват ошибок обработчика + proc.on("error", async e => { + //Установим его статус в БД - ошибка обработки сервером приложений await self.dbConn.setQueueState({ - nQueueId: message.nId, - sExecMsg: sCheckResult, + nQueueId: prms.queue.nId, + sExecMsg: e.message, nIncExecCnt: NINC_EXEC_CNT_YES, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR }); - //Фиксируем ошибку в протоколе работы сервиса - await self.logger.error( - "Ошибка обработки исходящего сообщения сервером приложений: " + sCheckResult, - { - nQueueId: message.nId - } - ); - console.log("ДОЖДАЛИСЬ"); - } - console.log("ОСТАНОВ ОБРАБОТЧИКА"); - self.stopMessageWorker(proc); - }); - //Перехват ошибок обработчика - proc.on("error", async e => { - //Установим его статус в БД - ошибка обработки сервером приложений - await self.dbConn.setQueueState({ - nQueueId: message.nId, - sExecMsg: e.message, - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR + //Так же фиксируем ошибку в протоколе работы + await self.logger.error(`Ошибка обработки исходящего сообщения сервером приложений: ${e.message}`, { + nQueueId: prms.queue.nId + }); + //Завершим обработчик + self.stopMessageWorker(proc); }); - //Так же фиксируем ошибку в протоколе работы - await self.logger.error("Ошибка обработки исходящего сообщения сервером приложений: " + e.message, { - nQueueId: message.nId + //Перехват останова обработчика + proc.on("exit", code => {}); + //Запускаем обработчик + proc.send({ + nQueueId: prms.queue.nId, + service: {}, + function: {}, + blMsg: prms.queue.blMsg }); - //Завершим обработчик - self.stopMessageWorker(proc); - }); - //Перехват останова обработчика - proc.on("exit", code => {}); - //Запускаем обработчик - proc.send({ - queue: message, - service: {} - }); - //Уменьшаем количество доступных обработчиков - this.nWorkersLeft--; - //Вернем признак того, что сообщение обрабатывается - return true; + //Уменьшаем количество доступных обработчиков + this.nWorkersLeft--; + //Вернем признак того, что сообщение обрабатывается + return true; + } else { + //Вернем признак того, что сообщение не обрабатывается + return false; + } } else { - //Вернем признак того, что сообщение не обрабатывается - return false; + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } //Перезапуск опроса очереди исходящих сообщений @@ -205,10 +217,10 @@ class OutQueue extends EventEmitter { //Ставим их в очередь for (let i = 0; i < outMsgs.length; i++) { //Если сообщение успешно взято в обработку - if (this.processMessage(outMsgs[i])) { + if (this.processMessage({ queue: outMsgs[i] })) { //Скажем что оно у нас есть await this.logger.info( - "Новое исходящее сообщение: " + + "Исполняю отправку исходящего сообщения: " + outMsgs[i].nId + ", " + outMsgs[i].sInDate + @@ -216,8 +228,8 @@ class OutQueue extends EventEmitter { outMsgs[i].sServiceFnCode + ", " + outMsgs[i].sExecState + - ", " + - outMsgs[i].nExecCnt, + ", попытка исполнения - " + + (outMsgs[i].nExecCnt + 1), { nQueueId: outMsgs[i].nId } ); //Установим его статус в БД - обрабатывается сервером приложений @@ -233,7 +245,7 @@ class OutQueue extends EventEmitter { this.restartDetectingLoop(); } catch (e) { if (e instanceof ServerError) - await this.logger.error("При получении исходящего сообщения: " + e.sCode + ": " + e.sMessage); + await this.logger.error("При обработке исходящего сообщения: " + e.sCode + ": " + e.sMessage); else this.logger.error(SERR_UNEXPECTED + ": " + e.message); this.restartDetectingLoop(); } @@ -243,11 +255,26 @@ class OutQueue extends EventEmitter { } } //Запуск обработки очереди исходящих сообщений - startProcessing() { - //Выставляем флаг работы - this.bWorking = true; - setTimeout(this.outDetectingLoop, NDETECTING_LOOP_DELAY); - this.notifyStarted(); + startProcessing(prms) { + //Проверяем структуру переданного объекта для старта + let sCheckResult = validateObject( + prms, + prmsOutQueueSchema.startProcessing, + "Параметры функции запуска обработки очереди исходящих сообщений" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Выставляем флаг работы + this.bWorking = true; + //запоминаем список обслуживаемых сервисов + this.services = prms.services; + //Начинаем слушать очередь исходящих + setTimeout(this.outDetectingLoop, NDETECTING_LOOP_DELAY); + //И оповещаем всех что запустились + this.notifyStarted(); + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } } //Остановка обработки очереди исходящих сообщений stopProcessing() { @@ -260,7 +287,6 @@ class OutQueue extends EventEmitter { } //Ждем завершения работы всех обработчиков let i = setInterval(() => { - console.log(this.bWorking + " " + this.nWorkersLeft + " " + this.outGoing.nMaxWorkers); if (!this.bWorking && this.nWorkersLeft == this.outGoing.nMaxWorkers) { clearInterval(i); this.notifyStopped(); diff --git a/models/prms_out_queue.js b/models/prms_out_queue.js index 8ebb313..b951f77 100644 --- a/models/prms_out_queue.js +++ b/models/prms_out_queue.js @@ -8,7 +8,9 @@ //---------------------- const Schema = require("validate"); //Схемы валидации -const { outgoing } = require("./obj_config"); //Схемы валидации конфигурации сервера приложений +const { outGoing } = require("./obj_config"); //Схемы валидации конфигурации сервера приложений +const { defServices } = require("./obj_services"); //Схема валидации списка сервисов +const { Queue } = require("./obj_queue"); //Схема валидации сообщения очереди const { DBConnector } = require("../core/db_connector"); //Класс взаимодействия в БД const { Logger } = require("../core/logger"); //Класс для протоколирования работы @@ -20,7 +22,7 @@ const { Logger } = require("../core/logger"); //Класс для протоко exports.OutQueue = new Schema({ //Параметры обработки очереди исходящих сообщений outGoing: { - schema: outgoing, + schema: outGoing, required: true, message: { required: "Не указаны параметры обработки очереди исходящих сообщений (outGoing)" @@ -45,3 +47,21 @@ exports.OutQueue = new Schema({ } } }); + +//Схема валидации параметров функции передачи исходящего сообшения на обработку +exports.processMessage = new Schema({ + //Обрабатываемое исходящее сообщение + queue: { + schema: Queue, + required: true, + message: { + required: "Не указано обрабатываемое исходящее сообщение (queue)" + } + } +}); + +//Схема валидации параметров функции запуска обслуживания очереди +exports.startProcessing = new Schema({ + //Список обслуживаемых сервисов + services: defServices(true, "services") +});