From 77a113238b27e1214ca1ea50f89b7f94770ef286 Mon Sep 17 00:00:00 2001 From: Mikhail Chechnev Date: Mon, 7 Jan 2019 01:46:22 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9F=D0=B5=D1=80=D0=B5=D1=85=D0=BE=D0=B4=20?= =?UTF-8?q?=D0=BD=D0=B0=20=D0=BC=D0=BE=D0=B4=D1=83=D0=BB=D1=8C=20=D1=81=20?= =?UTF-8?q?=D0=BE=D1=87=D0=B5=D1=80=D0=B5=D0=B4=D1=8C=D1=8E=20=D1=80=D0=B0?= =?UTF-8?q?=D1=81=D1=81=D1=8B=D0=BB=D0=BA=D0=B8=20=D1=83=D0=B2=D0=B5=D0=B4?= =?UTF-8?q?=D0=BE=D0=BC=D0=BB=D0=B5=D0=BD=D0=B8=D0=B9,=20=D1=83=D1=82?= =?UTF-8?q?=D0=BE=D1=87=D0=BD=D0=B5=D0=BD=D0=B8=D0=B5=20=D0=B0=D0=BB=D0=B3?= =?UTF-8?q?=D0=BE=D1=80=D0=B8=D1=82=D0=BC=D0=BE=D0=B2=20=D0=BF=D1=80=D0=BE?= =?UTF-8?q?=D0=B2=D0=B5=D1=80=D0=BA=D0=B8=20=D0=B4=D0=BE=D1=81=D1=82=D1=83?= =?UTF-8?q?=D0=BF=D0=BD=D0=BE=D1=81=D1=82=D0=B8=20=D1=83=D0=B4=D0=B0=D0=BB?= =?UTF-8?q?=D0=B5=D0=BD=D0=BD=D1=8B=D1=85=20=D1=81=D0=B5=D1=80=D0=B2=D0=B8?= =?UTF-8?q?=D1=81=D0=BE=D0=B2=20(=D0=B2=20=D1=80=D0=B0=D1=81=D1=87=D1=91?= =?UTF-8?q?=D1=82=20=D0=B1=D0=B5=D1=80=D0=B5=D0=BC=20=D1=82=D0=BE=D0=BB?= =?UTF-8?q?=D1=8C=D0=BA=D0=BE=20=D1=81=D0=B5=D1=80=D0=B2=D0=B5=D1=80=D0=BD?= =?UTF-8?q?=D1=8B=D0=B5=20=D0=BE=D1=88=D0=B8=D0=B1=D0=BA=D0=B8=205xx,=20?= =?UTF-8?q?=D1=82=D0=B5=D0=BF=D0=B5=D1=80=D1=8C=20=D0=B5=D1=81=D1=82=D1=8C?= =?UTF-8?q?=20=D1=82=D0=B0=D0=B9=D0=BC=D0=B0=D1=83=D1=82=20=D0=BF=D1=80?= =?UTF-8?q?=D0=BE=D0=B2=D0=B5=D1=80=D0=BA=D0=B8,=20=D1=87=D1=82=D0=BE?= =?UTF-8?q?=D0=B1=D1=8B=20=D0=BD=D0=B5=20=D0=BF=D0=BE=D0=B4=D0=B2=D0=B8?= =?UTF-8?q?=D1=81=D0=B0=D0=BB=D0=BE=20=D0=B2=20=D1=81=D0=BB=D1=83=D1=87?= =?UTF-8?q?=D0=B0=D0=B5=20=D0=BA=D0=BE=D1=80=D1=8F=D0=B2=D1=8B=D1=85=20?= =?UTF-8?q?=D0=B0=D0=B4=D1=80=D0=B5=D1=81=D0=BE=D0=B2=20=D1=83=D0=B4=D0=B0?= =?UTF-8?q?=D0=BB=D0=B5=D0=BD=D0=BD=D1=8B=D1=85=20=D1=81=D0=B5=D1=80=D0=B2?= =?UTF-8?q?=D0=B5=D1=80=D0=BE=D0=B2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/app.js | 47 ++++++++++++++--- core/service_available_controller.js | 58 +++++++++++++-------- index.js | 2 +- models/prms_service_available_controller.js | 12 +++-- 4 files changed, 83 insertions(+), 36 deletions(-) diff --git a/core/app.js b/core/app.js index 938ee77..477000d 100644 --- a/core/app.js +++ b/core/app.js @@ -12,6 +12,7 @@ const db = require("./db_connector"); //Взаимодействие с БД const oq = require("./out_queue"); //Прослушивание очереди исходящих сообщений const iq = require("./in_queue"); //Прослушивание очереди входящих сообщений const sac = require("./service_available_controller"); //Контроль доступности удалённых сервисов +const ntf = require("./notifier"); //Отправка уведомлений const { ServerError } = require("./server_errors"); //Типовая ошибка const { makeErrorText, validateObject, getIPs } = require("./utils"); //Вспомогательные функции const { SERR_COMMON, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы @@ -35,6 +36,8 @@ class ParusAppServer { this.inQ = null; //Контроллер доступности удалённых сервисов this.srvAvlCtrl = null; + //Модуль отправки уведомлений + this.notifier = null; //Флаг остановки сервера this.bStopping = false; //Список обслуживаемых сервисов @@ -46,6 +49,8 @@ class ParusAppServer { this.onOutQStopped = this.onOutQStopped.bind(this); this.onInQStarted = this.onInQStarted.bind(this); this.onInQStopped = this.onInQStopped.bind(this); + this.onNotifierStarted = this.onNotifierStarted.bind(this); + this.onNotifierStopped = this.onNotifierStopped.bind(this); this.onServiceACStarted = this.onServiceACStarted.bind(this); this.onServiceACStopped = this.onServiceACStopped.bind(this); } @@ -111,6 +116,29 @@ class ParusAppServer { await this.logger.info( `Обработчик очереди входящих сообщений запущен (порт - ${nPort}, доступные IP - ${getIPs().join("; ")})` ); + //Запускаем модуль отправки уведомлений + await this.logger.info("Запуск модуля отправки уведомлений..."); + try { + this.notifier.startNotifier(); + } catch (e) { + await this.logger.error(`Ошибка запуска модуля отправки уведомлений: ${makeErrorText(e)}`); + await this.stop(); + return; + } + } + //При останове обработчика входящих сообщений + async onInQStopped() { + //Сообщим, что остановили обработчик + await this.logger.warn("Обработчик очереди входящих сообщений остановлен"); + //Останавливаем модуль отправки уведомлений + await this.logger.warn("Останов модуля отправки уведомлений..."); + if (this.notifier) this.notifier.stopNotifier(); + else await this.onNotifierStopped(); + } + //При запуске модуля отправки уведомлений + async onNotifierStarted() { + //Сообщим, что запустили модуль + await this.logger.info(`Модуль отправки уведомлений запущен`); //Запускаем контроллер доступности удалённых сервисов await this.logger.info("Запуск контроллера доступности удалённых сервисов..."); try { @@ -121,10 +149,10 @@ class ParusAppServer { return; } } - //При останове обработчика входящих сообщений - async onInQStopped() { - //Сообщим, что остановили обработчик - await this.logger.warn("Обработчик очереди входящих сообщений остановлен"); + //При останове модуля отправки уведомлений + async onNotifierStopped() { + //Сообщим, что остановили модуль + await this.logger.warn("Модуль отправки уведомлений остановлен"); //Останавливаем контроллер доступности удалённных сервисов await this.logger.warn("Останов контроллера доступности удалённых сервисов..."); if (this.srvAvlCtrl) this.srvAvlCtrl.stopController(); @@ -172,8 +200,10 @@ class ParusAppServer { this.outQ = new oq.OutQueue({ outGoing: prms.config.outGoing, dbConn: this.dbConn, logger: this.logger }); //Создаём обработчик очереди входящих this.inQ = new iq.InQueue({ inComing: prms.config.inComing, dbConn: this.dbConn, logger: this.logger }); - //Создаём контроллер доступности удалённых сервислв - this.srvAvlCtrl = new sac.ServiceAvailableController({ logger: this.logger, mail: prms.config.mail }); + //Создаём модуль рассылки уведомлений + this.notifier = new ntf.Notifier({ logger: this.logger, mail: prms.config.mail }); + //Создаём контроллер доступности удалённых сервисов + this.srvAvlCtrl = new sac.ServiceAvailableController({ logger: this.logger, notifier: this.notifier }); //Скажем что инициализировали await this.logger.info("Сервер приложений инициализирован"); } else { @@ -197,6 +227,9 @@ class ParusAppServer { //Включим прослушивание событий обработчика входящих сообщений this.inQ.on(iq.SEVT_IN_QUEUE_STARTED, this.onInQStarted); this.inQ.on(iq.SEVT_IN_QUEUE_STOPPED, this.onInQStopped); + //Включим прослушивание событий модуля рассылки уведомлений + this.notifier.on(ntf.SEVT_NOTIFIER_STARTED, this.onNotifierStarted); + this.notifier.on(ntf.SEVT_NOTIFIER_STOPPED, this.onNotifierStopped); //Включим прослушивание событий контроллера доступности удалённых сервисов this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED, this.onServiceACStarted); this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED, this.onServiceACStopped); @@ -209,7 +242,7 @@ class ParusAppServer { if (!this.bStopping) { //Установим флаг - остановка в процессе this.bStopping = true; - //Сообщаем, что начала останов сервера + //Сообщаем, что начался останов сервера await this.logger.warn("Останов сервера приложений..."); //Останов обслуживания очереди исходящих await this.logger.warn("Останов обработчика очереди исходящих сообщений..."); diff --git a/core/service_available_controller.js b/core/service_available_controller.js index bd272be..21c6915 100644 --- a/core/service_available_controller.js +++ b/core/service_available_controller.js @@ -12,7 +12,7 @@ const rqp = require("request-promise"); //Работа с HTTP/HTTPS запро const EventEmitter = require("events"); //Обработчик пользовательских событий const { ServerError } = require("./server_errors"); //Типовая ошибка const { SERR_SERVICE_UNAVAILABLE, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы -const { makeErrorText, validateObject, sendMail } = require("./utils"); //Вспомогательные функции +const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции const prmsServiceAvailableControllerSchema = require("../models/prms_service_available_controller"); //Схемы валидации параметров функций класса const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервисов @@ -40,7 +40,7 @@ class ServiceAvailableController extends EventEmitter { constructor(prms) { //Создадим экземпляр родительского класса super(); - //Проверяем структуру переданного объекта для подключения + //Проверяем структуру переданного набора параметров для конструктора let sCheckResult = validateObject( prms, prmsServiceAvailableControllerSchema.ServiceAvailableController, @@ -58,17 +58,19 @@ class ServiceAvailableController extends EventEmitter { this.bInDetectingLoop = false; //Идентификатор таймера проверки доступности сервисов this.nDetectingLoopTimeOut = null; - //Запомним параметры отправки E-Mail - this.mail = prms.mail; + //Запомним уведомитель + this.notifier = prms.notifier; //Запомним логгер this.logger = prms.logger; + //Установим таймаут проведки адреса сервиса (мс) + this.nCheckTimeout = 10000; //Привяжем методы к указателю на себя для использования в обработчиках событий this.serviceDetectingLoop = this.serviceDetectingLoop.bind(this); } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } - //Уведомление об запуске контроллера + //Уведомление о запуске контроллера notifyStarted() { //Оповестим подписчиков о запуске this.emit(SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED); @@ -86,15 +88,15 @@ class ServiceAvailableController extends EventEmitter { await this.serviceDetectingLoop(); }, NDETECTING_LOOP_INTERVAL); } else { - //Если мы не работаем и просили оповстить об останове (видимо была команда на останов) - сделаем это + //Если мы не работаем и просили оповестить об останове (видимо была команда на останов) - сделаем это if (this.bNotifyStopped) this.notifyStopped(); } } - //Опрос очереди исходящих сообщений + //Опрос доступности сервисов async serviceDetectingLoop() { //Если есть сервисы для опроса if (this.services && Array.isArray(this.services) && this.services.length > 0) { - //Выставим флаг - цикл опроса кативен + //Выставим флаг - цикл опроса активен this.bInDetectingLoop = true; try { //Обходим список сервисов для проверки @@ -106,7 +108,7 @@ class ServiceAvailableController extends EventEmitter { ) { try { //Отправляем проверочный запрос - await rqp({ url: this.services[i].sSrvRoot }); + await rqp({ url: this.services[i].sSrvRoot, timeout: this.nCheckTimeout }); //Запрос прошел - фиксируем дату доступности и сбрасываем дату недоступности this.services[i].dAvailable = new Date(); this.services[i].dUnAvailable = null; @@ -116,20 +118,31 @@ class ServiceAvailableController extends EventEmitter { //Сформируем текст ошибки в зависимости от того, что случилось let sError = "Неожиданная ошибка удалённого сервиса"; if (e.error) { - sError = `Ошибка передачи данных: ${e.error.code}`; + let sSubError = e.error.code || e.error; + if (e.error.code === "ESOCKETTIMEDOUT") + sSubError = `сервис не ответил на запрос в течение ${this.nCheckTimeout} мс`; + sError = `Ошибка передачи данных: ${sSubError}`; } if (e.response) { - sError = `Ошибка работы удалённого сервиса: ${e.response.statusCode} - ${ - e.response.statusMessage - }`; + //Нам нужны только ошибки сервера + if (String(e.response.statusCode).startsWith("5")) { + sError = `Ошибка работы удалённого сервиса: ${e.response.statusCode} - ${ + e.response.statusMessage + }`; + } else { + //Остальное - клиентские ошибки, но сервер-то вроде отвечает, поэтому - пропускаем + this.services[i].dUnAvailable = null; + } + } + //Фиксируем ошибку проверки в протоколе (только если она действительно была) + if (this.services[i].dUnAvailable) { + await this.logger.warn( + `При проверке доступности сервиса ${this.services[i].sCode}: ${makeErrorText( + new ServerError(SERR_SERVICE_UNAVAILABLE, sError) + )} (адрес - ${this.services[i].sSrvRoot})`, + { nServiceId: this.services[i].nId } + ); } - //Фиксируем ошибку проверки в протоколе - await this.logger.warn( - `При проверке доступности сервиса ${this.services[i].sCode}: ${makeErrorText( - new ServerError(SERR_SERVICE_UNAVAILABLE, sError) - )} (адрес - ${this.services[i].sSrvRoot})`, - { nServiceId: this.services[i].nId } - ); } //Если есть даты - будем проверять if (this.services[i].dUnAvailable && this.services[i].dAvailable) { @@ -151,8 +164,7 @@ class ServiceAvailableController extends EventEmitter { //И в почту, если есть список адресов if (this.services[i].sUnavlblNtfMail) { try { - await sendMail({ - mail: this.mail, + this.notifier.addMessage({ sTo: this.services[i].sUnavlblNtfMail, sSubject, sMessage @@ -204,7 +216,7 @@ class ServiceAvailableController extends EventEmitter { s.dUnAvailable = null; s.dAvailable = new Date(); }); - //Начинаем слушать очередь исходящих + //Начинаем проверять список сервисов setTimeout(this.serviceDetectingLoop, NDETECTING_LOOP_DELAY); //И оповещаем всех что запустились this.notifyStarted(); diff --git a/index.js b/index.js index 2d3b7af..0d99138 100644 --- a/index.js +++ b/index.js @@ -73,7 +73,7 @@ const start = async () => { await appSrv.run(); } catch (e) { //Если есть ошибки с которыми сервер не справился - ловим их, показываем... - appSrv.logger.error(makeErrorText(e)); + await appSrv.logger.error(makeErrorText(e)); //...и пытаемся остановить сервер нормально try { await appSrv.stop(); diff --git a/models/prms_service_available_controller.js b/models/prms_service_available_controller.js index c61bee9..fcae53a 100644 --- a/models/prms_service_available_controller.js +++ b/models/prms_service_available_controller.js @@ -9,7 +9,7 @@ const Schema = require("validate"); //Схемы валидации const { defServices } = require("./obj_services"); //Схема валидации списка сервисов -const { mail } = require("./obj_config"); //Схемы валидации конфигурации сервера приложений +const { Notifier } = require("../core/notifier"); //Класс рассылки уведомлений const { Logger } = require("../core/logger"); //Класс для протоколирования работы //------------------ @@ -18,12 +18,14 @@ const { Logger } = require("../core/logger"); //Класс для протоко //Схема валидации параметров конструктора exports.ServiceAvailableController = new Schema({ - //Параметры отправки E-Mail уведомлений - mail: { - schema: mail, + //Объект для рассылки уведомлений + notifier: { + type: Notifier, required: true, message: { - required: path => `Не указаны параметры отправки E-Mail уведомлений (${path})` + type: path => + `Объект для рассылки уведомлений (${path}) имеет некорректный тип данных (ожидалось - Notifier)`, + required: path => `Не указан объект для рассылки уведомлений (${path})` } }, //Объект для протоколирования работы