Переход на модуль с очередью рассылки уведомлений, уточнение алгоритмов проверки доступности удаленных сервисов (в расчёт берем только серверные ошибки 5xx, теперь есть таймаут проверки, чтобы не подвисало в случае корявых адресов удаленных серверов)

This commit is contained in:
Mikhail Chechnev 2019-01-07 01:46:22 +03:00
parent 2ed1401db7
commit 77a113238b
4 changed files with 83 additions and 36 deletions

View File

@ -12,6 +12,7 @@ const db = require("./db_connector"); //Взаимодействие с БД
const oq = require("./out_queue"); //Прослушивание очереди исходящих сообщений
const iq = require("./in_queue"); //Прослушивание очереди входящих сообщений
const sac = require("./service_available_controller"); //Контроль доступности удалённых сервисов
const ntf = require("./notifier"); //Отправка уведомлений
const { ServerError } = require("./server_errors"); //Типовая ошибка
const { makeErrorText, validateObject, getIPs } = require("./utils"); //Вспомогательные функции
const { SERR_COMMON, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы
@ -35,6 +36,8 @@ class ParusAppServer {
this.inQ = null;
//Контроллер доступности удалённых сервисов
this.srvAvlCtrl = null;
//Модуль отправки уведомлений
this.notifier = null;
//Флаг остановки сервера
this.bStopping = false;
//Список обслуживаемых сервисов
@ -46,6 +49,8 @@ class ParusAppServer {
this.onOutQStopped = this.onOutQStopped.bind(this);
this.onInQStarted = this.onInQStarted.bind(this);
this.onInQStopped = this.onInQStopped.bind(this);
this.onNotifierStarted = this.onNotifierStarted.bind(this);
this.onNotifierStopped = this.onNotifierStopped.bind(this);
this.onServiceACStarted = this.onServiceACStarted.bind(this);
this.onServiceACStopped = this.onServiceACStopped.bind(this);
}
@ -111,6 +116,29 @@ class ParusAppServer {
await this.logger.info(
`Обработчик очереди входящих сообщений запущен (порт - ${nPort}, доступные IP - ${getIPs().join("; ")})`
);
//Запускаем модуль отправки уведомлений
await this.logger.info("Запуск модуля отправки уведомлений...");
try {
this.notifier.startNotifier();
} catch (e) {
await this.logger.error(`Ошибка запуска модуля отправки уведомлений: ${makeErrorText(e)}`);
await this.stop();
return;
}
}
//При останове обработчика входящих сообщений
async onInQStopped() {
//Сообщим, что остановили обработчик
await this.logger.warn("Обработчик очереди входящих сообщений остановлен");
//Останавливаем модуль отправки уведомлений
await this.logger.warn("Останов модуля отправки уведомлений...");
if (this.notifier) this.notifier.stopNotifier();
else await this.onNotifierStopped();
}
//При запуске модуля отправки уведомлений
async onNotifierStarted() {
//Сообщим, что запустили модуль
await this.logger.info(`Модуль отправки уведомлений запущен`);
//Запускаем контроллер доступности удалённых сервисов
await this.logger.info("Запуск контроллера доступности удалённых сервисов...");
try {
@ -121,10 +149,10 @@ class ParusAppServer {
return;
}
}
//При останове обработчика входящих сообщений
async onInQStopped() {
//Сообщим, что остановили обработчик
await this.logger.warn("Обработчик очереди входящих сообщений остановлен");
//При останове модуля отправки уведомлений
async onNotifierStopped() {
//Сообщим, что остановили модуль
await this.logger.warn("Модуль отправки уведомлений остановлен");
//Останавливаем контроллер доступности удалённных сервисов
await this.logger.warn("Останов контроллера доступности удалённых сервисов...");
if (this.srvAvlCtrl) this.srvAvlCtrl.stopController();
@ -172,8 +200,10 @@ class ParusAppServer {
this.outQ = new oq.OutQueue({ outGoing: prms.config.outGoing, dbConn: this.dbConn, logger: this.logger });
//Создаём обработчик очереди входящих
this.inQ = new iq.InQueue({ inComing: prms.config.inComing, dbConn: this.dbConn, logger: this.logger });
//Создаём контроллер доступности удалённых сервислв
this.srvAvlCtrl = new sac.ServiceAvailableController({ logger: this.logger, mail: prms.config.mail });
//Создаём модуль рассылки уведомлений
this.notifier = new ntf.Notifier({ logger: this.logger, mail: prms.config.mail });
//Создаём контроллер доступности удалённых сервисов
this.srvAvlCtrl = new sac.ServiceAvailableController({ logger: this.logger, notifier: this.notifier });
//Скажем что инициализировали
await this.logger.info("Сервер приложений инициализирован");
} else {
@ -197,6 +227,9 @@ class ParusAppServer {
//Включим прослушивание событий обработчика входящих сообщений
this.inQ.on(iq.SEVT_IN_QUEUE_STARTED, this.onInQStarted);
this.inQ.on(iq.SEVT_IN_QUEUE_STOPPED, this.onInQStopped);
//Включим прослушивание событий модуля рассылки уведомлений
this.notifier.on(ntf.SEVT_NOTIFIER_STARTED, this.onNotifierStarted);
this.notifier.on(ntf.SEVT_NOTIFIER_STOPPED, this.onNotifierStopped);
//Включим прослушивание событий контроллера доступности удалённых сервисов
this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED, this.onServiceACStarted);
this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED, this.onServiceACStopped);
@ -209,7 +242,7 @@ class ParusAppServer {
if (!this.bStopping) {
//Установим флаг - остановка в процессе
this.bStopping = true;
//Сообщаем, что начала останов сервера
//Сообщаем, что начался останов сервера
await this.logger.warn("Останов сервера приложений...");
//Останов обслуживания очереди исходящих
await this.logger.warn("Останов обработчика очереди исходящих сообщений...");

View File

@ -12,7 +12,7 @@ const rqp = require("request-promise"); //Работа с HTTP/HTTPS запро
const EventEmitter = require("events"); //Обработчик пользовательских событий
const { ServerError } = require("./server_errors"); //Типовая ошибка
const { SERR_SERVICE_UNAVAILABLE, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы
const { makeErrorText, validateObject, sendMail } = require("./utils"); //Вспомогательные функции
const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции
const prmsServiceAvailableControllerSchema = require("../models/prms_service_available_controller"); //Схемы валидации параметров функций класса
const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервисов
@ -40,7 +40,7 @@ class ServiceAvailableController extends EventEmitter {
constructor(prms) {
//Создадим экземпляр родительского класса
super();
//Проверяем структуру переданного объекта для подключения
//Проверяем структуру переданного набора параметров для конструктора
let sCheckResult = validateObject(
prms,
prmsServiceAvailableControllerSchema.ServiceAvailableController,
@ -58,17 +58,19 @@ class ServiceAvailableController extends EventEmitter {
this.bInDetectingLoop = false;
//Идентификатор таймера проверки доступности сервисов
this.nDetectingLoopTimeOut = null;
//Запомним параметры отправки E-Mail
this.mail = prms.mail;
//Запомним уведомитель
this.notifier = prms.notifier;
//Запомним логгер
this.logger = prms.logger;
//Установим таймаут проведки адреса сервиса (мс)
this.nCheckTimeout = 10000;
//Привяжем методы к указателю на себя для использования в обработчиках событий
this.serviceDetectingLoop = this.serviceDetectingLoop.bind(this);
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Уведомление об запуске контроллера
//Уведомление о запуске контроллера
notifyStarted() {
//Оповестим подписчиков о запуске
this.emit(SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED);
@ -86,15 +88,15 @@ class ServiceAvailableController extends EventEmitter {
await this.serviceDetectingLoop();
}, NDETECTING_LOOP_INTERVAL);
} else {
//Если мы не работаем и просили оповстить об останове (видимо была команда на останов) - сделаем это
//Если мы не работаем и просили оповестить об останове (видимо была команда на останов) - сделаем это
if (this.bNotifyStopped) this.notifyStopped();
}
}
//Опрос очереди исходящих сообщений
//Опрос доступности сервисов
async serviceDetectingLoop() {
//Если есть сервисы для опроса
if (this.services && Array.isArray(this.services) && this.services.length > 0) {
//Выставим флаг - цикл опроса кативен
//Выставим флаг - цикл опроса активен
this.bInDetectingLoop = true;
try {
//Обходим список сервисов для проверки
@ -106,7 +108,7 @@ class ServiceAvailableController extends EventEmitter {
) {
try {
//Отправляем проверочный запрос
await rqp({ url: this.services[i].sSrvRoot });
await rqp({ url: this.services[i].sSrvRoot, timeout: this.nCheckTimeout });
//Запрос прошел - фиксируем дату доступности и сбрасываем дату недоступности
this.services[i].dAvailable = new Date();
this.services[i].dUnAvailable = null;
@ -116,14 +118,24 @@ class ServiceAvailableController extends EventEmitter {
//Сформируем текст ошибки в зависимости от того, что случилось
let sError = "Неожиданная ошибка удалённого сервиса";
if (e.error) {
sError = `Ошибка передачи данных: ${e.error.code}`;
let sSubError = e.error.code || e.error;
if (e.error.code === "ESOCKETTIMEDOUT")
sSubError = `сервис не ответил на запрос в течение ${this.nCheckTimeout} мс`;
sError = `Ошибка передачи данных: ${sSubError}`;
}
if (e.response) {
//Нам нужны только ошибки сервера
if (String(e.response.statusCode).startsWith("5")) {
sError = `Ошибка работы удалённого сервиса: ${e.response.statusCode} - ${
e.response.statusMessage
}`;
} else {
//Остальное - клиентские ошибки, но сервер-то вроде отвечает, поэтому - пропускаем
this.services[i].dUnAvailable = null;
}
//Фиксируем ошибку проверки в протоколе
}
//Фиксируем ошибку проверки в протоколе (только если она действительно была)
if (this.services[i].dUnAvailable) {
await this.logger.warn(
`При проверке доступности сервиса ${this.services[i].sCode}: ${makeErrorText(
new ServerError(SERR_SERVICE_UNAVAILABLE, sError)
@ -131,6 +143,7 @@ class ServiceAvailableController extends EventEmitter {
{ nServiceId: this.services[i].nId }
);
}
}
//Если есть даты - будем проверять
if (this.services[i].dUnAvailable && this.services[i].dAvailable) {
//Выясним как долго он уже недоступен (в минутах)
@ -151,8 +164,7 @@ class ServiceAvailableController extends EventEmitter {
//И в почту, если есть список адресов
if (this.services[i].sUnavlblNtfMail) {
try {
await sendMail({
mail: this.mail,
this.notifier.addMessage({
sTo: this.services[i].sUnavlblNtfMail,
sSubject,
sMessage
@ -204,7 +216,7 @@ class ServiceAvailableController extends EventEmitter {
s.dUnAvailable = null;
s.dAvailable = new Date();
});
//Начинаем слушать очередь исходящих
//Начинаем проверять список сервисов
setTimeout(this.serviceDetectingLoop, NDETECTING_LOOP_DELAY);
//И оповещаем всех что запустились
this.notifyStarted();

View File

@ -73,7 +73,7 @@ const start = async () => {
await appSrv.run();
} catch (e) {
//Если есть ошибки с которыми сервер не справился - ловим их, показываем...
appSrv.logger.error(makeErrorText(e));
await appSrv.logger.error(makeErrorText(e));
//...и пытаемся остановить сервер нормально
try {
await appSrv.stop();

View File

@ -9,7 +9,7 @@
const Schema = require("validate"); //Схемы валидации
const { defServices } = require("./obj_services"); //Схема валидации списка сервисов
const { mail } = require("./obj_config"); //Схемы валидации конфигурации сервера приложений
const { Notifier } = require("../core/notifier"); //Класс рассылки уведомлений
const { Logger } = require("../core/logger"); //Класс для протоколирования работы
//------------------
@ -18,12 +18,14 @@ const { Logger } = require("../core/logger"); //Класс для протоко
//Схема валидации параметров конструктора
exports.ServiceAvailableController = new Schema({
//Параметры отправки E-Mail уведомлений
mail: {
schema: mail,
//Объект для рассылки уведомлений
notifier: {
type: Notifier,
required: true,
message: {
required: path => `Не указаны параметры отправки E-Mail уведомлений (${path})`
type: path =>
`Объект для рассылки уведомлений (${path}) имеет некорректный тип данных (ожидалось - Notifier)`,
required: path => `Не указан объект для рассылки уведомлений (${path})`
}
},
//Объект для протоколирования работы