Внедрение сервиса контроля доступности удалённых серверов в приложение

This commit is contained in:
Mikhail Chechnev 2018-12-10 00:08:13 +03:00
parent 79475a7224
commit 4dc3c9fc8d
3 changed files with 404 additions and 8 deletions

View File

@ -10,8 +10,9 @@
const lg = require("./logger"); //Протоколирование работы const lg = require("./logger"); //Протоколирование работы
const db = require("./db_connector"); //Взаимодействие с БД const db = require("./db_connector"); //Взаимодействие с БД
const oq = require("./out_queue"); //Прослушивание очереди исходящих сообщений const oq = require("./out_queue"); //Прослушивание очереди исходящих сообщений
const sac = require("./service_available_controller"); //Контроль доступности удалённых сервисов
const { ServerError } = require("./server_errors"); //Типовая ошибка const { ServerError } = require("./server_errors"); //Типовая ошибка
const { validateObject } = require("./utils"); //Вспомогательные функции const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции
const { SERR_COMMON, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы const { SERR_COMMON, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы
const prmsAppSchema = require("../models/prms_app"); //Схема валидации параметров функций класса const prmsAppSchema = require("../models/prms_app"); //Схема валидации параметров функций класса
@ -29,6 +30,8 @@ class ParusAppServer {
this.dbConn = null; this.dbConn = null;
//Обработчик очереди исходящих //Обработчик очереди исходящих
this.outQ = null; this.outQ = null;
//Контроллер доступности удалённых сервисов
this.srvAvlCtrl = null;
//Флаг остановки сервера //Флаг остановки сервера
this.bStopping = false; this.bStopping = false;
//Список обслуживаемых сервисов //Список обслуживаемых сервисов
@ -38,6 +41,8 @@ class ParusAppServer {
this.onDBDisconnected = this.onDBDisconnected.bind(this); this.onDBDisconnected = this.onDBDisconnected.bind(this);
this.onOutQStarted = this.onOutQStarted.bind(this); this.onOutQStarted = this.onOutQStarted.bind(this);
this.onOutQStopped = this.onOutQStopped.bind(this); this.onOutQStopped = this.onOutQStopped.bind(this);
this.onServiceACStarted = this.onServiceACStarted.bind(this);
this.onServiceACStopped = this.onServiceACStopped.bind(this);
} }
//При подключении к БД //При подключении к БД
async onDBConnected(connection) { async onDBConnected(connection) {
@ -50,7 +55,7 @@ class ParusAppServer {
try { try {
this.services = await this.dbConn.getServices(); this.services = await this.dbConn.getServices();
} catch (e) { } catch (e) {
await this.logger.error("Ошибка получения списка сервисов: " + e.sCode + ": " + e.sMessage); await this.logger.error(`Ошибка получения списка сервисов: ${makeErrorText(e)}`);
await this.stop(); await this.stop();
return; return;
} }
@ -60,14 +65,10 @@ class ParusAppServer {
try { try {
this.outQ.startProcessing({ services: this.services }); this.outQ.startProcessing({ services: this.services });
} catch (e) { } catch (e) {
await this.logger.error( await this.logger.error(`Ошибка запуска обработчика очереди исходящих сообщений: ${makeErrorText(e)}`);
"Ошибка запуска обработчика очереди исходящих сообщений: " + e.sCode + ": " + e.sMessage
);
await this.stop(); await this.stop();
return; return;
} }
//Рапортуем, что запустились
await this.logger.info("Сервер приложений запущен");
} }
//При отключении от БД //При отключении от БД
async onDBDisconnected() { async onDBDisconnected() {
@ -80,11 +81,37 @@ class ParusAppServer {
async onOutQStarted() { async onOutQStarted() {
//Сообщим, что запустили обработчик //Сообщим, что запустили обработчик
await this.logger.info("Обработчик очереди исходящих сообщений запущен"); await this.logger.info("Обработчик очереди исходящих сообщений запущен");
//Запускаем обслуживание очереди исходящих
await this.logger.info("Запуск контроллера доступности удалённых сервисов...");
try {
this.srvAvlCtrl.startController({ services: this.services });
} catch (e) {
await this.logger.error(`Ошибка запуска контроллера доступности удалённых сервисов: ${makeErrorText(e)}`);
await this.stop();
return;
}
//Рапортуем, что запустились
await this.logger.info("Сервер приложений запущен");
} }
//При останове обработчика исходящих сообщений //При останове обработчика исходящих сообщений
async onOutQStopped() { async onOutQStopped() {
//Сообщим, что остановили обработчик //Сообщим, что остановили обработчик
await this.logger.warn("Обработчик очереди исходящих сообщений остановлен"); await this.logger.warn("Обработчик очереди исходящих сообщений остановлен");
//Останавливаем контроллер доступности удалённных сервисов
await this.logger.warn("Останов контроллера доступности удалённых сервисов...");
if (this.srvAvlCtrl) this.srvAvlCtrl.stopController();
else await this.onServiceACStopped();
}
//При запуске контроллера доступности удаленных сервисов
async onServiceACStarted() {
//Сообщим, что запустили обработчик
await this.logger.info("Контроллер доступности удалённых сервисов запущен");
}
//При останове контроллера доступности удаленных сервисов
async onServiceACStopped() {
//Сообщим, что остановили обработчик
await this.logger.warn("Контроллер доступности удалённых сервисов остановлен");
//Отключение от БД //Отключение от БД
if (this.dbConn) { if (this.dbConn) {
if (this.dbConn.bConnected) { if (this.dbConn.bConnected) {
@ -114,6 +141,8 @@ class ParusAppServer {
this.dbConn = new db.DBConnector({ connectSettings: prms.config.dbConnect }); this.dbConn = new db.DBConnector({ connectSettings: prms.config.dbConnect });
//Создаём обработчик очереди исходящих //Создаём обработчик очереди исходящих
this.outQ = new oq.OutQueue({ outGoing: prms.config.outGoing, dbConn: this.dbConn, logger: this.logger }); this.outQ = new oq.OutQueue({ outGoing: prms.config.outGoing, dbConn: this.dbConn, logger: this.logger });
//Создаём контроллер доступности удалённых сервислв
this.srvAvlCtrl = new sac.ServiceAvailableController({ logger: this.logger, mail: prms.config.mail });
//Скажем что инициализировали //Скажем что инициализировали
await this.logger.info("Сервер приложений инициализирован"); await this.logger.info("Сервер приложений инициализирован");
} else { } else {
@ -134,6 +163,9 @@ class ParusAppServer {
//Включим прослушивание событий обработчика исходящих сообщений //Включим прослушивание событий обработчика исходящих сообщений
this.outQ.on(oq.SEVT_OUT_QUEUE_STARTED, this.onOutQStarted); this.outQ.on(oq.SEVT_OUT_QUEUE_STARTED, this.onOutQStarted);
this.outQ.on(oq.SEVT_OUT_QUEUE_STOPPED, this.onOutQStopped); this.outQ.on(oq.SEVT_OUT_QUEUE_STOPPED, this.onOutQStopped);
//Включим прослушивание событий контроллера доступности удалённых сервисов
this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED, this.onServiceACStarted);
this.srvAvlCtrl.on(sac.SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED, this.onServiceACStopped);
//Подключаемся к БД //Подключаемся к БД
await this.logger.info("Подключение сервера приложений к БД..."); await this.logger.info("Подключение сервера приложений к БД...");
await this.dbConn.connect(); await this.dbConn.connect();
@ -148,7 +180,7 @@ class ParusAppServer {
//Останов обслуживания очереди исходящих //Останов обслуживания очереди исходящих
await this.logger.warn("Останов обработчика очереди исходящих сообщений..."); await this.logger.warn("Останов обработчика очереди исходящих сообщений...");
if (this.outQ) this.outQ.stopProcessing(); if (this.outQ) this.outQ.stopProcessing();
else this.onOutQStopped(); else await this.onOutQStopped();
} }
} }
} }

View File

@ -0,0 +1,287 @@
/*
Сервис интеграции ПП Парус 8 с WEB API
Модуль ядра: контроль доступности сервисов
*/
//------------------------------
// Подключение внешних библиотек
//------------------------------
const _ = require("lodash"); //Работа с массивами и коллекциями
const rqp = require("request-promise"); //Работа с HTTP/HTTPS запросами
const EventEmitter = require("events"); //Обработчик пользовательских событий
const nodemailer = require("nodemailer"); //Отправка E-Mail сообщений
const { ServerError } = require("./server_errors"); //Типовая ошибка
const { SERR_SERVICE_UNAVAILABLE, SERR_MAIL_FAILED, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы
const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции
const prmsServiceAvailableControllerSchema = require("../models/prms_service_available_controller"); //Схемы валидации параметров функций класса
const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервисов
//--------------------------
// Глобальные идентификаторы
//--------------------------
//Типовые события
const SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED = "SERVICE_AVAILABLE_CONTROLLER_STARTED"; //Контроллер доступности сервисов запущен
const SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED = "SERVICE_AVAILABLE_CONTROLLER_STOPPED"; //Контроллер доступности сервисов остановлен
//Время отложенного старта цикла контроля (мс)
const NDETECTING_LOOP_DELAY = 3000;
//Интервал проверки доступности сервисов (мс)
const NDETECTING_LOOP_INTERVAL = 60000;
//------------
// Тело модуля
//------------
//Класс контроллера доступности сервисов
class ServiceAvailableController extends EventEmitter {
//Конструктор класса
constructor(prms) {
//Создадим экземпляр родительского класса
super();
//Проверяем структуру переданного объекта для подключения
let sCheckResult = validateObject(
prms,
prmsServiceAvailableControllerSchema.ServiceAvailableController,
"Параметры конструктора класса ServiceAvailableController"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Список обслуживаемых сервисов
this.services = null;
//Признак функционирования контроллера
this.bWorking = false;
//Признак необходимости оповещения об останове
this.bNotifyStopped = false;
//Флаг работы цикла проверки
this.bInDetectingLoop = false;
//Идентификатор таймера проверки доступности сервисов
this.nDetectingLoopTimeOut = null;
//Запомним параметры отправки E-Mail
this.mail = prms.mail;
//Запомним логгер
this.logger = prms.logger;
//Привяжем методы к указателю на себя для использования в обработчиках событий
this.serviceDetectingLoop = this.serviceDetectingLoop.bind(this);
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Уведомление об запуске контроллера
notifyStarted() {
//Оповестим подписчиков о запуске
this.emit(SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED);
}
//Уведомление об остановке контроллера
notifyStopped() {
//Оповестим подписчиков об останове
this.emit(SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED);
}
//Отправка E-Mail уведомления о недоступности сервиса
sendUnAvailableMail(prms) {
return new Promise((resolve, reject) => {
//Проверяем структуру переданного объекта для старта
let sCheckResult = validateObject(
prms,
prmsServiceAvailableControllerSchema.sendUnAvailableMail,
"Параметры функции отправки E-Mail уведомления о недоступности удалённого сервиса"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Параметры подключения к SMTP-серверу
let transporter = nodemailer.createTransport({
host: this.mail.sHost,
port: this.mail.nPort,
secure: this.mail.nPort == 465,
auth: {
user: this.mail.sUser,
pass: this.mail.sPass
}
});
//Параметры отправляемого сообщения
let mailOptions = {
from: this.mail.sFrom,
to: prms.sTo,
subject: prms.sSubject,
text: prms.sMessage
};
//Отправляем сообщение
transporter.sendMail(mailOptions, (error, info) => {
if (error) {
reject(new ServerError(SERR_MAIL_FAILED, `${error.code}: ${error.response}`));
} else {
if (info.rejected && Array.isArray(info.rejected) && info.rejected.length > 0) {
reject(
new ServerError(
SERR_MAIL_FAILED,
`Сообщение не доствлено адресатам: ${info.rejected.join(", ")}`
)
);
} else {
resolve(info);
}
}
});
} else {
reject(new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult));
}
});
}
//Перезапуск опроса списка сервисов
async restartDetectingLoop() {
//Включаем опрос сервисов только если установлен флаг работы
if (this.bWorking) {
this.nDetectingLoopTimeOut = await setTimeout(async () => {
await this.serviceDetectingLoop();
}, NDETECTING_LOOP_INTERVAL);
} else {
//Если мы не работаем и просили оповстить об останове (видимо была команда на останов) - сделаем это
if (this.bNotifyStopped) this.notifyStopped();
}
}
//Опрос очереди исходящих сообщений
async serviceDetectingLoop() {
//Если есть сервисы для опроса
if (this.services && Array.isArray(this.services) && this.services.length > 0) {
//Выставим флаг - цикл опроса кативен
this.bInDetectingLoop = true;
try {
//Обходим список сервисов для проверки
for (let i = 0; i < this.services.length; i++) {
//Если сервис надо проверять на доступность и это сервис для отправки исходящих сообщений
if (
this.services[i].nUnavlblNtfSign == objServiceSchema.NUNAVLBL_NTF_SIGN_YES &&
this.services[i].nSrvType == objServiceSchema.NSRV_TYPE_SEND
) {
try {
//Отправляем проверочный запрос
await rqp({ url: this.services[i].sSrvRoot });
//Запрос прошел - фиксируем дату доступности и сбрасываем дату недоступности
this.services[i].dAvailable = new Date();
this.services[i].dUnAvailable = null;
} catch (e) {
//Зафиксируем дату и время недоступности
this.services[i].dUnAvailable = new Date();
//Сформируем текст ошибки в зависимости от того, что случилось
let sError = "Неожиданная ошибка удалённого сервиса";
if (e.error) {
sError = `Ошибка передачи данных: ${e.error.code}`;
}
if (e.response) {
sError = `Ошибка работы удалённого сервиса: ${e.response.statusCode} - ${
e.response.statusMessage
}`;
}
//Фиксируем ошибку проверки в протоколе
await this.logger.warn(
`При проверке доступности сервиса ${this.services[i].sCode}: ${makeErrorText(
new ServerError(SERR_SERVICE_UNAVAILABLE, sError)
)} (адрес - ${this.services[i].sSrvRoot})`,
{ nServiceId: this.services[i].nId }
);
}
//Если есть даты - будем проверять
if (this.services[i].dUnAvailable && this.services[i].dAvailable) {
//Выясним как долго он уже недоступен (в минутах)
let nDiffMs = this.services[i].dUnAvailable - this.services[i].dAvailable;
let nDiffMins = Math.round(((nDiffMs % 86400000) % 3600000) / 60000);
//Если простой больше указанного в настройках - будем оповещать по почте
if (nDiffMins >= this.services[i].nUnavlblNtfTime) {
let sSubject = `Удалённый сервис ${this.services[i].sCode} неотвечает на запросы`;
let sMessage = `Сервис недоступен более ${
this.services[i].nUnavlblNtfTime
} мин. (${nDiffMins} мин. с момента запуска сервера приложений).\nАдрес сервиса: ${
this.services[i].sSrvRoot
}`;
await this.logger.error(sMessage, { nServiceId: this.services[i].nId });
try {
await this.sendUnAvailableMail({
sTo: this.services[i].sUnavlblNtfMail,
sSubject,
sMessage
});
} catch (e) {
await this.logger.error(makeErrorText(e), {
nServiceId: this.services[i].nId
});
}
}
}
}
}
} catch (e) {
//Фиксируем ошибку в протоколе работы сервера приложений
await this.logger.error(makeErrorText(e));
}
//Выставим флаг - цикл опроса неактивен
this.bInDetectingLoop = false;
//Перезапускаем опрос
await this.restartDetectingLoop();
} else {
//Выставим флаг - цикл опроса неактивен
this.bInDetectingLoop = false;
//Опрашивать нечего - ждём и перезапускаем цикл опроса
await this.restartDetectingLoop();
}
}
//Запуск контроллера
startController(prms) {
//Проверяем структуру переданного объекта для старта
let sCheckResult = validateObject(
prms,
prmsServiceAvailableControllerSchema.startController,
"Параметры функции запуска контроллера доступности сервисов"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Выставляем флаг работы
this.bWorking = true;
//Выставляем флаг необходимости оповещения об останове
this.bNotifyStopped = false;
//Выставляем флаг неактивности (пока) цикла опроса
this.bInDetectingLoop = false;
//запоминаем список обслуживаемых сервисов и инициализируем даты доступности и недоступности
this.services = _.cloneDeep(prms.services);
this.services.forEach(s => {
s.dUnAvailable = null;
s.dAvailable = new Date();
});
//Начинаем слушать очередь исходящих
setTimeout(this.serviceDetectingLoop, NDETECTING_LOOP_DELAY);
//И оповещаем всех что запустились
this.notifyStarted();
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Остановка контроллера
stopController() {
//Выставляем флаг неработы
this.bWorking = false;
//Если сейчас мы не в цикле проверки
if (!this.bInDetectingLoop) {
//Сбросим его таймер, чтобы он не запустился снова
if (this.nDetectingLoopTimeOut) {
clearTimeout(this.nDetectingLoopTimeOut);
this.nDetectingLoopTimeOut = null;
}
//Выставляем флаг - не надо оповещать об останове
this.bNotifyStopped = false;
//Оповестим об останове
this.notifyStopped();
} else {
//Выставляем флаг необходимости оповещения об останове (это будет сделано автоматически по завершению цикла проверки)
this.bNotifyStopped = true;
}
}
}
//-----------------
// Интерфейс модуля
//-----------------
exports.SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED = SEVT_SERVICE_AVAILABLE_CONTROLLER_STARTED;
exports.SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED = SEVT_SERVICE_AVAILABLE_CONTROLLER_STOPPED;
exports.ServiceAvailableController = ServiceAvailableController;

View File

@ -0,0 +1,77 @@
/*
Сервис интеграции ПП Парус 8 с WEB API
Модели данных: описатели параметров функций контроллера доступности сервисов (класс ServiceAvailableController)
*/
//----------------------
// Подключение библиотек
//----------------------
const Schema = require("validate"); //Схемы валидации
const { defServices } = require("./obj_services"); //Схема валидации списка сервисов
const { mail } = require("./obj_config"); //Схемы валидации конфигурации сервера приложений
const { Logger } = require("../core/logger"); //Класс для протоколирования работы
//------------------
// Интерфейс модуля
//------------------
//Схема валидации параметров конструктора
exports.ServiceAvailableController = new Schema({
//Параметры отправки E-Mail уведомлений
mail: {
schema: mail,
required: true,
message: {
required: path => `Не указаны параметры отправки E-Mail уведомлений (${path})`
}
},
//Объект для протоколирования работы
logger: {
type: Logger,
required: true,
message: {
type: path =>
`Объект для протоколирования работы (${path}) имеет некорректный тип данных (ожидалось - Logger)`,
required: path => `Не указаны объект для протоколирования работы (${path})`
}
}
});
//Схема валидации параметров функции отправки E-Mail уведомления о недоступности сервиса
exports.sendUnAvailableMail = new Schema({
//Список адресов E-Mail для отправки уведомления
sTo: {
type: String,
required: true,
message: {
type: path =>
`Список адресов E-Mail для отправки уведомления (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан cписок адресов E-Mail для отправки уведомления (${path})`
}
},
//Заголовок сообщения
sSubject: {
type: String,
required: true,
message: {
type: path => `Заголовок сообщения (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан заголовок сообщения (${path})`
}
},
//Текст уведомления
sMessage: {
type: String,
required: true,
message: {
type: path => `Текст уведомления (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан текст уведомления (${path})`
}
}
});
//Схема валидации параметров функции запуска контроллера
exports.startController = new Schema({
//Список обслуживаемых сервисов
services: defServices(true, "services")
});