419 lines
25 KiB
JavaScript
419 lines
25 KiB
JavaScript
/*
|
||
Сервис интеграции ПП Парус 8 с WEB API
|
||
Модуль ядра: отработка очереди входящих сообщений
|
||
*/
|
||
|
||
//------------------------------
|
||
// Подключение внешних библиотек
|
||
//------------------------------
|
||
|
||
const _ = require("lodash"); //Работа с массивами и коллекциями
|
||
const EventEmitter = require("events"); //Обработчик пользовательских событий
|
||
const express = require("express"); //WEB-сервер Express
|
||
const bodyParser = require("body-parser"); //Модуль для Express (разбор тела входящего запроса)
|
||
const { ServerError } = require("./server_errors"); //Типовая ошибка
|
||
const { makeErrorText, validateObject, buildURL, getAppSrvFunction } = require("./utils"); //Вспомогательные функции
|
||
const { NINC_EXEC_CNT_YES, NIS_ORIGINAL_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
|
||
const objInQueueSchema = require("../models/obj_in_queue"); //Схема валидации сообщений обмена с бработчиком очереди входящих сообщений
|
||
const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервиса
|
||
const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса
|
||
const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди
|
||
const prmsInQueueSchema = require("../models/prms_in_queue"); //Схемы валидации параметров функций класса
|
||
const {
|
||
SERR_OBJECT_BAD_INTERFACE,
|
||
SERR_WEB_SERVER,
|
||
SERR_APP_SERVER_BEFORE,
|
||
SERR_APP_SERVER_AFTER,
|
||
SERR_DB_SERVER,
|
||
SERR_UNAUTH
|
||
} = require("./constants"); //Общесистемные константы
|
||
|
||
//--------------------------
|
||
// Глобальные идентификаторы
|
||
//--------------------------
|
||
|
||
//Типовые события
|
||
const SEVT_IN_QUEUE_STARTED = "IN_QUEUE_STARTED"; //Обработчик очереди запущен
|
||
const SEVT_IN_QUEUE_STOPPED = "IN_QUEUE_STOPPED"; //Обработчик очереди остановлен
|
||
|
||
//------------
|
||
// Тело модуля
|
||
//------------
|
||
|
||
//Класс очереди входящих сообщений
|
||
class InQueue extends EventEmitter {
|
||
//Конструктор класса
|
||
constructor(prms) {
|
||
//Создадим экземпляр родительского класса
|
||
super();
|
||
//Проверяем структуру переданного объекта для подключения
|
||
let sCheckResult = validateObject(prms, prmsInQueueSchema.InQueue, "Параметры конструктора класса InQueue");
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
//Список обслуживаемых сервисов
|
||
this.services = null;
|
||
//Признак функционирования обработчика
|
||
this.bWorking = false;
|
||
//Параметры очереди
|
||
this.inComing = _.cloneDeep(prms.inComing);
|
||
//Запомним подключение к БД
|
||
this.dbConn = prms.dbConn;
|
||
//Запомним логгер
|
||
this.logger = prms.logger;
|
||
//Запомним уведомитель
|
||
this.notifier = prms.notifier;
|
||
//WEB-приложение
|
||
this.webApp = express();
|
||
//WEB-сервер
|
||
this.srv = null;
|
||
} else {
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
}
|
||
//Уведомление о запуске обработчика очереди
|
||
notifyStarted() {
|
||
//Оповестим подписчиков о запуске
|
||
this.emit(SEVT_IN_QUEUE_STARTED, this.inComing.nPort);
|
||
}
|
||
//Уведомление об остановке обработчика очереди
|
||
notifyStopped() {
|
||
//Оповестим подписчиков об останове
|
||
this.emit(SEVT_IN_QUEUE_STOPPED);
|
||
}
|
||
//Обработка сообщения
|
||
async processMessage(prms) {
|
||
//Проверяем структуру переданного объекта для обработки
|
||
let sCheckResult = validateObject(
|
||
prms,
|
||
prmsInQueueSchema.processMessage,
|
||
"Параметры функции обработки входящего сообщения"
|
||
);
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
//Буфер для сообщения очереди
|
||
let q = null;
|
||
try {
|
||
//Тело сообщения и ответ на него
|
||
let blMsg = null;
|
||
let blResp = null;
|
||
//Определимся с телом сообщения - для POST сообщений - это тело запроса
|
||
if (prms.function.nFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) {
|
||
blMsg = prms.req.body && !_.isEmpty(prms.req.body) ? prms.req.body : null;
|
||
} else {
|
||
//Для GET - параметры запроса
|
||
if (!_.isEmpty(prms.req.query)) blMsg = new Buffer(JSON.stringify(prms.req.query));
|
||
}
|
||
//Кладём сообщение в очередь
|
||
q = await this.dbConn.putQueue({
|
||
nServiceFnId: prms.function.nId,
|
||
blMsg
|
||
});
|
||
//Скажем что пришло новое входящее сообщение
|
||
await this.logger.info(
|
||
`Новое входящее сообщение от ${prms.req.connection.address().address} для фукнции ${
|
||
prms.function.sCode
|
||
} (${buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL })})`,
|
||
{ nQueueId: q.nId }
|
||
);
|
||
//Выполняем обработчик "До" (если он есть)
|
||
if (prms.function.sAppSrvBefore) {
|
||
//Выставим статус сообщению очереди - исполняется сервером приложений
|
||
q = await this.dbConn.setQueueState({
|
||
nQueueId: q.nId,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
|
||
});
|
||
//Выполняем
|
||
const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore);
|
||
let resBefore = null;
|
||
try {
|
||
let resBeforePrms = _.cloneDeep(prms);
|
||
resBeforePrms.queue = _.cloneDeep(q);
|
||
resBeforePrms.queue.blMsg = blMsg;
|
||
resBeforePrms.queue.blResp = blResp;
|
||
resBefore = await fnBefore(resBeforePrms);
|
||
} catch (e) {
|
||
throw new ServerError(SERR_APP_SERVER_BEFORE, e.message);
|
||
}
|
||
//Проверяем структуру ответа функции предобработки
|
||
if (resBefore) {
|
||
let sCheckResult = validateObject(
|
||
resBefore,
|
||
objInQueueSchema.InQueueProcessorFnBefore,
|
||
"Результат функции предобработки входящего сообщения"
|
||
);
|
||
//Если структура ответа в норме
|
||
if (!sCheckResult) {
|
||
//Выставим статус сообщению очереди - исполнено сервером приложений
|
||
q = await this.dbConn.setQueueState({
|
||
nQueueId: q.nId,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
|
||
});
|
||
//Фиксируем результат исполнения
|
||
if (!_.isUndefined(resBefore.blMsg)) {
|
||
blMsg = resBefore.blMsg;
|
||
q = await this.dbConn.setQueueMsg({
|
||
nQueueId: q.nId,
|
||
blMsg
|
||
});
|
||
}
|
||
if (!_.isUndefined(resBefore.blResp)) {
|
||
blResp = resBefore.blResp;
|
||
q = await this.dbConn.setQueueResp({
|
||
nQueueId: q.nId,
|
||
blResp,
|
||
nIsOriginal: NIS_ORIGINAL_NO
|
||
});
|
||
}
|
||
//Если пришел флаг ошибочной аутентификации и он положительный - то это ошибка, дальше ничего не делаем
|
||
if (!_.isUndefined(resBefore.bUnAuth))
|
||
if (resBefore.bUnAuth === true)
|
||
throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
|
||
} else {
|
||
//Или расскажем об ошибке
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
}
|
||
}
|
||
//Вызываем обработчик со стороны БД (если он есть)
|
||
if (prms.function.sPrcResp) {
|
||
//Фиксируем начало исполнения сервером БД - в статусе сообщения
|
||
q = await this.dbConn.setQueueState({
|
||
nQueueId: q.nId,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB
|
||
});
|
||
//Вызов обработчика БД
|
||
let prcRes = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId });
|
||
//Если результат - ошибка пробрасываем её
|
||
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR)
|
||
throw new ServerError(SERR_DB_SERVER, prcRes.sMsg);
|
||
//Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом
|
||
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH)
|
||
throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации");
|
||
//Выставим статус сообщению очереди - исполнено обработчиком БД
|
||
q = await this.dbConn.setQueueState({
|
||
nQueueId: q.nId,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK
|
||
});
|
||
//Считаем ответ полученный от системы
|
||
let qData = await this.dbConn.getQueueResp({ nQueueId: q.nId });
|
||
blResp = qData.blResp;
|
||
}
|
||
//Выполняем обработчик "После" (если он есть)
|
||
if (prms.function.sAppSrvAfter) {
|
||
//Выставим статус сообщению очереди - исполняется сервером приложений
|
||
q = await this.dbConn.setQueueState({
|
||
nQueueId: q.nId,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
|
||
});
|
||
//Выполняем
|
||
const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter);
|
||
let resAfter = null;
|
||
try {
|
||
let resAfterPrms = _.cloneDeep(prms);
|
||
resAfterPrms.queue = _.cloneDeep(q);
|
||
resAfterPrms.queue.blMsg = blMsg;
|
||
resAfterPrms.queue.blResp = blResp;
|
||
resAfter = await fnAfter(resAfterPrms);
|
||
} catch (e) {
|
||
throw new ServerError(SERR_APP_SERVER_AFTER, e.message);
|
||
}
|
||
//Проверяем структуру ответа функции предобработки
|
||
if (resAfter) {
|
||
let sCheckResult = validateObject(
|
||
resAfter,
|
||
objInQueueSchema.InQueueProcessorFnAfter,
|
||
"Результат функции постобработки входящего сообщения"
|
||
);
|
||
//Если структура ответа в норме
|
||
if (!sCheckResult) {
|
||
//Выставим статус сообщению очереди - исполнено сервером приложений
|
||
q = await this.dbConn.setQueueState({
|
||
nQueueId: q.nId,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
|
||
});
|
||
//Фиксируем результат исполнения
|
||
if (!_.isUndefined(resAfter.blResp)) {
|
||
blResp = resAfter.blResp;
|
||
q = await this.dbConn.setQueueResp({
|
||
nQueueId: q.nId,
|
||
blResp,
|
||
nIsOriginal: NIS_ORIGINAL_NO
|
||
});
|
||
}
|
||
//Если пришел флаг ошибочной аутентификации и он положительный - то это ошибка, дальше ничего не делаем
|
||
if (!_.isUndefined(resAfter.bUnAuth))
|
||
if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
|
||
} else {
|
||
//Или расскажем об ошибке
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
}
|
||
}
|
||
//Всё успешно - отдаём результат клиенту
|
||
prms.res.status(200).send(blResp);
|
||
//Фиксируем успех обработки - в статусе сообщения
|
||
q = await this.dbConn.setQueueState({
|
||
nQueueId: q.nId,
|
||
nIncExecCnt: NINC_EXEC_CNT_YES,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
|
||
});
|
||
//Фиксируем успех обработки - в протоколе работы сервиса
|
||
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
|
||
} catch (e) {
|
||
//Тема и текст уведомления об ошибке
|
||
let sSubject = `Ошибка обработки входящего сообщения сервером приложений для функции "${
|
||
prms.function.sCode
|
||
}" сервиса "${prms.service.sCode}"`;
|
||
let sMessage = makeErrorText(e);
|
||
//Если сообщение очереди успели создать
|
||
if (q) {
|
||
//Фиксируем ошибку обработки сервером приложений - в статусе сообщения
|
||
q = await this.dbConn.setQueueState({
|
||
nQueueId: q.nId,
|
||
sExecMsg: sMessage,
|
||
nIncExecCnt: NINC_EXEC_CNT_YES,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
|
||
});
|
||
//Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса
|
||
await this.logger.error(
|
||
`Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${sMessage}`,
|
||
{ nQueueId: q.nId }
|
||
);
|
||
//Добавим чуть больше информации в тему сообщения
|
||
sSubject = `Ошибка обработки входящего сообщения ${q.nId} сервером приложений для функции "${
|
||
prms.function.sCode
|
||
}" сервиса "${prms.service.sCode}"`;
|
||
} else {
|
||
//Ограничимся общей ошибкой
|
||
await this.logger.error(sMessage, {
|
||
nServiceId: prms.service.nId,
|
||
nServiceFnId: prms.function.nId
|
||
});
|
||
}
|
||
//Если для функции-обработчика указан признак необходимости оповещения об ошибках
|
||
if (prms.function.nErrNtfSign == objServiceFnSchema.NERR_NTF_SIGN_YES) {
|
||
//Отправим уведомление об ошибке отработки в почту
|
||
await this.notifier.addMessage({
|
||
sTo: prms.function.sErrNtfMail,
|
||
sSubject,
|
||
sMessage
|
||
});
|
||
}
|
||
//Отправим ошибку клиенту
|
||
prms.res.status(500).send(makeErrorText(e));
|
||
}
|
||
} else {
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
}
|
||
//Запуск обработки очереди входящих сообщений
|
||
startProcessing(prms) {
|
||
//Проверяем структуру переданного объекта для старта
|
||
let sCheckResult = validateObject(
|
||
prms,
|
||
prmsInQueueSchema.startProcessing,
|
||
"Параметры функции запуска обработки очереди входящих сообщений"
|
||
);
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
//Выставляем флаг работы
|
||
this.bWorking = true;
|
||
//запоминаем список обслуживаемых сервисов
|
||
this.services = prms.services;
|
||
//Конфигурируем сервер - обработка тела сообщения
|
||
this.webApp.use(bodyParser.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" }));
|
||
//Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений
|
||
_.forEach(_.filter(this.services, { nSrvType: objServiceSchema.NSRV_TYPE_RECIVE }), srvs => {
|
||
//Для любых запросов к корневому адресу сервиса - ответ о том, что это за сервис, и что он работает
|
||
this.webApp.all(srvs.sSrvRoot, (req, res) => {
|
||
res.status(200).send(
|
||
`<html><body><center><br><h1>Сервер приложений ПП Пурс 8</h1><h3>Сервис: ${
|
||
srvs.sName
|
||
}</h3></center></body></html>`
|
||
);
|
||
});
|
||
//Для всех функций сервиса...
|
||
_.forEach(srvs.functions, fn => {
|
||
//...собственный обработчик, в зависимости от указанного способа передачи параметров
|
||
this.webApp[fn.nFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST ? "post" : "get"](
|
||
buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }),
|
||
async (req, res) => {
|
||
try {
|
||
//Вызываем обработчик
|
||
await this.processMessage({ req, res, service: srvs, function: fn });
|
||
} catch (e) {
|
||
//Протоколируем в журнал работы сервера
|
||
await this.logger.error(makeErrorText(e), {
|
||
nServiceId: srvs.nId,
|
||
nServiceFnId: fn.nId
|
||
});
|
||
//Отправим ошибку клиенту
|
||
res.status(500).send(makeErrorText(e));
|
||
}
|
||
}
|
||
);
|
||
//...и собственный обработчик ошибок
|
||
this.webApp.use(
|
||
buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }),
|
||
async (err, req, res, next) => {
|
||
//Протоколируем в журнал работы сервера
|
||
await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)), {
|
||
nServiceId: srvs.nId,
|
||
nServiceFnId: fn.nId
|
||
});
|
||
//Отправим ошибку клиенту
|
||
res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)));
|
||
}
|
||
);
|
||
});
|
||
});
|
||
//Запросы на адреса, не входящие в состав объявленных сервисов - 404 NOT FOUND
|
||
this.webApp.use("*", (req, res) => {
|
||
res.status(404).send(
|
||
"<html><body><center><br><h1>Сервер приложений ПП Пурс 8</h1><h3>Запрошенный адрес не найден</h3></center></body></html>"
|
||
);
|
||
});
|
||
//Ошибки, не отработанные индивидуальными обработчиками - 500 SERVER ERROR
|
||
this.webApp.use(async (err, req, res, next) => {
|
||
//Протоколируем в журнал работы сервера
|
||
await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)));
|
||
//Отправим ошибку клиенту
|
||
res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)));
|
||
});
|
||
//Запускаем сервер
|
||
this.srv = this.webApp.listen(this.inComing.nPort, () => {
|
||
//И оповещаем всех что запустились
|
||
this.notifyStarted();
|
||
});
|
||
this.srv.on("error", e => {
|
||
throw new ServerError(e.code, `Фатальная ошибка обработчика очереди входящих сообщений: ${e.message}`);
|
||
});
|
||
} else {
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
}
|
||
//Остановка обработки очереди исходящих сообщений
|
||
stopProcessing() {
|
||
//Выставляем флаг неработы
|
||
this.bWorking = false;
|
||
//Останавливаем WEB-сервер (если создавался)
|
||
if (this.srv) {
|
||
this.srv.close(() => {
|
||
//Оповещаем всхес, что остановились
|
||
this.notifyStopped();
|
||
});
|
||
} else {
|
||
//Сервер не создавался - просто оповещаем всех, что остановились
|
||
this.notifyStopped();
|
||
}
|
||
}
|
||
}
|
||
|
||
//-----------------
|
||
// Интерфейс модуля
|
||
//-----------------
|
||
|
||
exports.SEVT_IN_QUEUE_STARTED = SEVT_IN_QUEUE_STARTED;
|
||
exports.SEVT_IN_QUEUE_STOPPED = SEVT_IN_QUEUE_STOPPED;
|
||
exports.InQueue = InQueue;
|