diff --git a/core/app.js b/core/app.js index 568833d..69c328f 100644 --- a/core/app.js +++ b/core/app.js @@ -10,8 +10,9 @@ const lg = require("./logger"); //Протоколирование работы const db = require("./db_connector"); //Взаимодействие с БД const oq = require("./out_queue"); //Прослушивание очереди исходящих сообщений +const sac = require("./service_available_controller"); //Контроль доступности удалённых сервисов const { ServerError } = require("./server_errors"); //Типовая ошибка -const { validateObject } = require("./utils"); //Вспомогательные функции +const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции const { SERR_COMMON, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы const prmsAppSchema = require("../models/prms_app"); //Схема валидации параметров функций класса @@ -29,6 +30,8 @@ class ParusAppServer { this.dbConn = null; //Обработчик очереди исходящих this.outQ = null; + //Контроллер доступности удалённых сервисов + this.srvAvlCtrl = null; //Флаг остановки сервера this.bStopping = false; //Список обслуживаемых сервисов @@ -38,6 +41,8 @@ class ParusAppServer { this.onDBDisconnected = this.onDBDisconnected.bind(this); this.onOutQStarted = this.onOutQStarted.bind(this); this.onOutQStopped = this.onOutQStopped.bind(this); + this.onServiceACStarted = this.onServiceACStarted.bind(this); + this.onServiceACStopped = this.onServiceACStopped.bind(this); } //При подключении к БД async onDBConnected(connection) { @@ -50,7 +55,7 @@ class ParusAppServer { try { this.services = await this.dbConn.getServices(); } catch (e) { - await this.logger.error("Ошибка получения списка сервисов: " + e.sCode + ": " + e.sMessage); + await this.logger.error(`Ошибка получения списка сервисов: ${makeErrorText(e)}`); await this.stop(); return; } @@ -60,14 +65,10 @@ class ParusAppServer { try { this.outQ.startProcessing({ services: this.services }); } catch (e) { - await this.logger.error( - "Ошибка запуска обработчика очереди исходящих сообщений: " + e.sCode + ": " + e.sMessage - ); + await this.logger.error(`Ошибка запуска обработчика очереди исходящих сообщений: ${makeErrorText(e)}`); await this.stop(); return; } - //Рапортуем, что запустились - await this.logger.info("Сервер приложений запущен"); } //При отключении от БД async onDBDisconnected() { @@ -80,11 +81,37 @@ class ParusAppServer { async onOutQStarted() { //Сообщим, что запустили обработчик await this.logger.info("Обработчик очереди исходящих сообщений запущен"); + + //Запускаем обслуживание очереди исходящих + await this.logger.info("Запуск контроллера доступности удалённых сервисов..."); + try { + this.srvAvlCtrl.startController({ services: this.services }); + } catch (e) { + await this.logger.error(`Ошибка запуска контроллера доступности удалённых сервисов: ${makeErrorText(e)}`); + await this.stop(); + return; + } + //Рапортуем, что запустились + await this.logger.info("Сервер приложений запущен"); } //При останове обработчика исходящих сообщений async onOutQStopped() { //Сообщим, что остановили обработчик await this.logger.warn("Обработчик очереди исходящих сообщений остановлен"); + //Останавливаем контроллер доступности удалённных сервисов + await this.logger.warn("Останов контроллера доступности удалённых сервисов..."); + if (this.srvAvlCtrl) this.srvAvlCtrl.stopController(); + else await this.onServiceACStopped(); + } + //При запуске контроллера доступности удаленных сервисов + async onServiceACStarted() { + //Сообщим, что запустили обработчик + await this.logger.info("Контроллер доступности удалённых сервисов запущен"); + } + //При останове контроллера доступности удаленных сервисов + async onServiceACStopped() { + //Сообщим, что остановили обработчик + await this.logger.warn("Контроллер доступности удалённых сервисов остановлен"); //Отключение от БД if (this.dbConn) { if (this.dbConn.bConnected) { @@ -114,6 +141,8 @@ class ParusAppServer { this.dbConn = new db.DBConnector({ connectSettings: prms.config.dbConnect }); //Создаём обработчик очереди исходящих this.outQ = new oq.OutQueue({ outGoing: prms.config.outGoing, dbConn: this.dbConn, logger: this.logger }); + //Создаём контроллер доступности удалённых сервислв + this.srvAvlCtrl = new sac.ServiceAvailableController({ logger: this.logger, mail: prms.config.mail }); //Скажем что инициализировали await this.logger.info("Сервер приложений инициализирован"); } else { @@ -134,6 +163,9 @@ class ParusAppServer { //Включим прослушивание событий обработчика исходящих сообщений this.outQ.on(oq.SEVT_OUT_QUEUE_STARTED, this.onOutQStarted); this.outQ.on(oq.SEVT_OUT_QUEUE_STOPPED, this.onOutQStopped); + //Включим прослушивание событий контроллера доступности удалённых сервисов + this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED, this.onServiceACStarted); + this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED, this.onServiceACStopped); //Подключаемся к БД await this.logger.info("Подключение сервера приложений к БД..."); await this.dbConn.connect(); @@ -148,7 +180,7 @@ class ParusAppServer { //Останов обслуживания очереди исходящих await this.logger.warn("Останов обработчика очереди исходящих сообщений..."); if (this.outQ) this.outQ.stopProcessing(); - else this.onOutQStopped(); + else await this.onOutQStopped(); } } } diff --git a/core/service_available_controller.js b/core/service_available_controller.js new file mode 100644 index 0000000..16f41b7 --- /dev/null +++ b/core/service_available_controller.js @@ -0,0 +1,287 @@ +/* + Сервис интеграции ПП Парус 8 с WEB API + Модуль ядра: контроль доступности сервисов +*/ + +//------------------------------ +// Подключение внешних библиотек +//------------------------------ + +const _ = require("lodash"); //Работа с массивами и коллекциями +const rqp = require("request-promise"); //Работа с HTTP/HTTPS запросами +const EventEmitter = require("events"); //Обработчик пользовательских событий +const nodemailer = require("nodemailer"); //Отправка E-Mail сообщений +const { ServerError } = require("./server_errors"); //Типовая ошибка +const { SERR_SERVICE_UNAVAILABLE, SERR_MAIL_FAILED, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы +const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции +const prmsServiceAvailableControllerSchema = require("../models/prms_service_available_controller"); //Схемы валидации параметров функций класса +const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервисов + +//-------------------------- +// Глобальные идентификаторы +//-------------------------- + +//Типовые события +const SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED = "SERVICE_AVAILABLE_CONTROLLER_STARTED"; //Контроллер доступности сервисов запущен +const SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED = "SERVICE_AVAILABLE_CONTROLLER_STOPPED"; //Контроллер доступности сервисов остановлен + +//Время отложенного старта цикла контроля (мс) +const NDETECTING_LOOP_DELAY = 3000; + +//Интервал проверки доступности сервисов (мс) +const NDETECTING_LOOP_INTERVAL = 60000; + +//------------ +// Тело модуля +//------------ + +//Класс контроллера доступности сервисов +class ServiceAvailableController extends EventEmitter { + //Конструктор класса + constructor(prms) { + //Создадим экземпляр родительского класса + super(); + //Проверяем структуру переданного объекта для подключения + let sCheckResult = validateObject( + prms, + prmsServiceAvailableControllerSchema.ServiceAvailableController, + "Параметры конструктора класса ServiceAvailableController" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Список обслуживаемых сервисов + this.services = null; + //Признак функционирования контроллера + this.bWorking = false; + //Признак необходимости оповещения об останове + this.bNotifyStopped = false; + //Флаг работы цикла проверки + this.bInDetectingLoop = false; + //Идентификатор таймера проверки доступности сервисов + this.nDetectingLoopTimeOut = null; + //Запомним параметры отправки E-Mail + this.mail = prms.mail; + //Запомним логгер + this.logger = prms.logger; + //Привяжем методы к указателю на себя для использования в обработчиках событий + this.serviceDetectingLoop = this.serviceDetectingLoop.bind(this); + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } + //Уведомление об запуске контроллера + notifyStarted() { + //Оповестим подписчиков о запуске + this.emit(SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED); + } + //Уведомление об остановке контроллера + notifyStopped() { + //Оповестим подписчиков об останове + this.emit(SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED); + } + //Отправка E-Mail уведомления о недоступности сервиса + sendUnAvailableMail(prms) { + return new Promise((resolve, reject) => { + //Проверяем структуру переданного объекта для старта + let sCheckResult = validateObject( + prms, + prmsServiceAvailableControllerSchema.sendUnAvailableMail, + "Параметры функции отправки E-Mail уведомления о недоступности удалённого сервиса" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Параметры подключения к SMTP-серверу + let transporter = nodemailer.createTransport({ + host: this.mail.sHost, + port: this.mail.nPort, + secure: this.mail.nPort == 465, + auth: { + user: this.mail.sUser, + pass: this.mail.sPass + } + }); + //Параметры отправляемого сообщения + let mailOptions = { + from: this.mail.sFrom, + to: prms.sTo, + subject: prms.sSubject, + text: prms.sMessage + }; + //Отправляем сообщение + transporter.sendMail(mailOptions, (error, info) => { + if (error) { + reject(new ServerError(SERR_MAIL_FAILED, `${error.code}: ${error.response}`)); + } else { + if (info.rejected && Array.isArray(info.rejected) && info.rejected.length > 0) { + reject( + new ServerError( + SERR_MAIL_FAILED, + `Сообщение не доствлено адресатам: ${info.rejected.join(", ")}` + ) + ); + } else { + resolve(info); + } + } + }); + } else { + reject(new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult)); + } + }); + } + //Перезапуск опроса списка сервисов + async restartDetectingLoop() { + //Включаем опрос сервисов только если установлен флаг работы + if (this.bWorking) { + this.nDetectingLoopTimeOut = await setTimeout(async () => { + await this.serviceDetectingLoop(); + }, NDETECTING_LOOP_INTERVAL); + } else { + //Если мы не работаем и просили оповстить об останове (видимо была команда на останов) - сделаем это + if (this.bNotifyStopped) this.notifyStopped(); + } + } + //Опрос очереди исходящих сообщений + async serviceDetectingLoop() { + //Если есть сервисы для опроса + if (this.services && Array.isArray(this.services) && this.services.length > 0) { + //Выставим флаг - цикл опроса кативен + this.bInDetectingLoop = true; + try { + //Обходим список сервисов для проверки + for (let i = 0; i < this.services.length; i++) { + //Если сервис надо проверять на доступность и это сервис для отправки исходящих сообщений + if ( + this.services[i].nUnavlblNtfSign == objServiceSchema.NUNAVLBL_NTF_SIGN_YES && + this.services[i].nSrvType == objServiceSchema.NSRV_TYPE_SEND + ) { + try { + //Отправляем проверочный запрос + await rqp({ url: this.services[i].sSrvRoot }); + //Запрос прошел - фиксируем дату доступности и сбрасываем дату недоступности + this.services[i].dAvailable = new Date(); + this.services[i].dUnAvailable = null; + } catch (e) { + //Зафиксируем дату и время недоступности + this.services[i].dUnAvailable = new Date(); + //Сформируем текст ошибки в зависимости от того, что случилось + let sError = "Неожиданная ошибка удалённого сервиса"; + if (e.error) { + sError = `Ошибка передачи данных: ${e.error.code}`; + } + if (e.response) { + sError = `Ошибка работы удалённого сервиса: ${e.response.statusCode} - ${ + e.response.statusMessage + }`; + } + //Фиксируем ошибку проверки в протоколе + await this.logger.warn( + `При проверке доступности сервиса ${this.services[i].sCode}: ${makeErrorText( + new ServerError(SERR_SERVICE_UNAVAILABLE, sError) + )} (адрес - ${this.services[i].sSrvRoot})`, + { nServiceId: this.services[i].nId } + ); + } + //Если есть даты - будем проверять + if (this.services[i].dUnAvailable && this.services[i].dAvailable) { + //Выясним как долго он уже недоступен (в минутах) + let nDiffMs = this.services[i].dUnAvailable - this.services[i].dAvailable; + let nDiffMins = Math.round(((nDiffMs % 86400000) % 3600000) / 60000); + //Если простой больше указанного в настройках - будем оповещать по почте + if (nDiffMins >= this.services[i].nUnavlblNtfTime) { + let sSubject = `Удалённый сервис ${this.services[i].sCode} неотвечает на запросы`; + let sMessage = `Сервис недоступен более ${ + this.services[i].nUnavlblNtfTime + } мин. (${nDiffMins} мин. с момента запуска сервера приложений).\nАдрес сервиса: ${ + this.services[i].sSrvRoot + }`; + await this.logger.error(sMessage, { nServiceId: this.services[i].nId }); + try { + await this.sendUnAvailableMail({ + sTo: this.services[i].sUnavlblNtfMail, + sSubject, + sMessage + }); + } catch (e) { + await this.logger.error(makeErrorText(e), { + nServiceId: this.services[i].nId + }); + } + } + } + } + } + } catch (e) { + //Фиксируем ошибку в протоколе работы сервера приложений + await this.logger.error(makeErrorText(e)); + } + //Выставим флаг - цикл опроса неактивен + this.bInDetectingLoop = false; + //Перезапускаем опрос + await this.restartDetectingLoop(); + } else { + //Выставим флаг - цикл опроса неактивен + this.bInDetectingLoop = false; + //Опрашивать нечего - ждём и перезапускаем цикл опроса + await this.restartDetectingLoop(); + } + } + //Запуск контроллера + startController(prms) { + //Проверяем структуру переданного объекта для старта + let sCheckResult = validateObject( + prms, + prmsServiceAvailableControllerSchema.startController, + "Параметры функции запуска контроллера доступности сервисов" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Выставляем флаг работы + this.bWorking = true; + //Выставляем флаг необходимости оповещения об останове + this.bNotifyStopped = false; + //Выставляем флаг неактивности (пока) цикла опроса + this.bInDetectingLoop = false; + //запоминаем список обслуживаемых сервисов и инициализируем даты доступности и недоступности + this.services = _.cloneDeep(prms.services); + this.services.forEach(s => { + s.dUnAvailable = null; + s.dAvailable = new Date(); + }); + //Начинаем слушать очередь исходящих + setTimeout(this.serviceDetectingLoop, NDETECTING_LOOP_DELAY); + //И оповещаем всех что запустились + this.notifyStarted(); + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } + //Остановка контроллера + stopController() { + //Выставляем флаг неработы + this.bWorking = false; + //Если сейчас мы не в цикле проверки + if (!this.bInDetectingLoop) { + //Сбросим его таймер, чтобы он не запустился снова + if (this.nDetectingLoopTimeOut) { + clearTimeout(this.nDetectingLoopTimeOut); + this.nDetectingLoopTimeOut = null; + } + //Выставляем флаг - не надо оповещать об останове + this.bNotifyStopped = false; + //Оповестим об останове + this.notifyStopped(); + } else { + //Выставляем флаг необходимости оповещения об останове (это будет сделано автоматически по завершению цикла проверки) + this.bNotifyStopped = true; + } + } +} + +//----------------- +// Интерфейс модуля +//----------------- + +exports.SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED = SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED; +exports.SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED = SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED; +exports.ServiceAvailableController = ServiceAvailableController; diff --git a/models/prms_service_available_controller.js b/models/prms_service_available_controller.js new file mode 100644 index 0000000..6b3bf3a --- /dev/null +++ b/models/prms_service_available_controller.js @@ -0,0 +1,77 @@ +/* + Сервис интеграции ПП Парус 8 с WEB API + Модели данных: описатели параметров функций контроллера доступности сервисов (класс ServiceAvailableController) +*/ + +//---------------------- +// Подключение библиотек +//---------------------- + +const Schema = require("validate"); //Схемы валидации +const { defServices } = require("./obj_services"); //Схема валидации списка сервисов +const { mail } = require("./obj_config"); //Схемы валидации конфигурации сервера приложений +const { Logger } = require("../core/logger"); //Класс для протоколирования работы + +//------------------ +// Интерфейс модуля +//------------------ + +//Схема валидации параметров конструктора +exports.ServiceAvailableController = new Schema({ + //Параметры отправки E-Mail уведомлений + mail: { + schema: mail, + required: true, + message: { + required: path => `Не указаны параметры отправки E-Mail уведомлений (${path})` + } + }, + //Объект для протоколирования работы + logger: { + type: Logger, + required: true, + message: { + type: path => + `Объект для протоколирования работы (${path}) имеет некорректный тип данных (ожидалось - Logger)`, + required: path => `Не указаны объект для протоколирования работы (${path})` + } + } +}); + +//Схема валидации параметров функции отправки E-Mail уведомления о недоступности сервиса +exports.sendUnAvailableMail = new Schema({ + //Список адресов E-Mail для отправки уведомления + sTo: { + type: String, + required: true, + message: { + type: path => + `Список адресов E-Mail для отправки уведомления (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан cписок адресов E-Mail для отправки уведомления (${path})` + } + }, + //Заголовок сообщения + sSubject: { + type: String, + required: true, + message: { + type: path => `Заголовок сообщения (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан заголовок сообщения (${path})` + } + }, + //Текст уведомления + sMessage: { + type: String, + required: true, + message: { + type: path => `Текст уведомления (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан текст уведомления (${path})` + } + } +}); + +//Схема валидации параметров функции запуска контроллера +exports.startController = new Schema({ + //Список обслуживаемых сервисов + services: defServices(true, "services") +});