/* Сервис интеграции ПП Парус 8 с WEB API Модуль ядра: отработка очереди входящих сообщений */ //------------------------------ // Подключение внешних библиотек //------------------------------ const _ = require("lodash"); //Работа с массивами и коллекциями const EventEmitter = require("events"); //Обработчик пользовательских событий const express = require("express"); //WEB-сервер Express const bodyParser = require("body-parser"); //Модуль для Express (разбор тела входящего запроса) const { ServerError } = require("./server_errors"); //Типовая ошибка const { makeErrorText, validateObject, buildURL, getAppSrvFunction } = require("./utils"); //Вспомогательные функции const { NINC_EXEC_CNT_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД const objInQueueSchema = require("../models/obj_in_queue"); //Схема валидации сообщений обмена с бработчиком очереди входящих сообщений const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервиса const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди const prmsInQueueSchema = require("../models/prms_in_queue"); //Схемы валидации параметров функций класса const { SERR_OBJECT_BAD_INTERFACE, SERR_WEB_SERVER, SERR_APP_SERVER_BEFORE, SERR_APP_SERVER_AFTER } = require("./constants"); //Общесистемные константы //-------------------------- // Глобальные идентификаторы //-------------------------- //Типовые события const SEVT_IN_QUEUE_STARTED = "IN_QUEUE_STARTED"; //Обработчик очереди запущен const SEVT_IN_QUEUE_STOPPED = "IN_QUEUE_STOPPED"; //Обработчик очереди остановлен //------------ // Тело модуля //------------ //Класс очереди входящих сообщений class InQueue extends EventEmitter { //Конструктор класса constructor(prms) { //Создадим экземпляр родительского класса super(); //Проверяем структуру переданного объекта для подключения let sCheckResult = validateObject(prms, prmsInQueueSchema.InQueue, "Параметры конструктора класса InQueue"); //Если структура объекта в норме if (!sCheckResult) { //Список обслуживаемых сервисов this.services = null; //Признак функционирования обработчика this.bWorking = false; //Параметры очереди this.inComing = _.cloneDeep(prms.inComing); //Запомним подключение к БД this.dbConn = prms.dbConn; //Запомним логгер this.logger = prms.logger; //WEB-приложение this.webApp = express(); //WEB-сервер this.srv = null; } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } //Уведомление о запуске обработчика очереди notifyStarted() { //Оповестим подписчиков о запуске this.emit(SEVT_IN_QUEUE_STARTED, this.inComing.nPort); } //Уведомление об остановке обработчика очереди notifyStopped() { //Оповестим подписчиков об останове this.emit(SEVT_IN_QUEUE_STOPPED); } //Обработка сообщения async processMessage(prms) { //Проверяем структуру переданного объекта для старта let sCheckResult = validateObject( prms, prmsInQueueSchema.processMessage, "Параметры функции обработки входящего сообщения" ); //Если структура объекта в норме if (!sCheckResult) { //Буфер для сообщения очереди let q = null; try { //Определимся с телом сообщения let blMsg = null; //Для POST сообщений - это тело запроса if (prms.function.nFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) { blMsg = prms.req.body && !_.isEmpty(prms.req.body) ? prms.req.body : null; } else { //Для GET - параметры запроса if (!_.isEmpty(prms.req.query)) blMsg = new Buffer(JSON.stringify(prms.req.query)); } //Кладём сообщение в очередь q = await this.dbConn.putQueue({ nServiceFnId: prms.function.nId, blMsg }); //Скажем что пришло новое входящее сообщение await this.logger.info( `Новое входящее сообщение от ${prms.req.connection.address().address} для фукнции ${ prms.function.sCode } (${buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL })})`, { nQueueId: q.nId } ); //Выполняем обработчик "До" (если он есть) if (prms.function.sAppSrvBefore) { //Выставим статус сообщению очереди - исполняется сервером приложений q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP }); //Выполняем const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore); let resBefore = null; try { prms.queue = q; resBefore = await fnBefore(prms); } catch (e) { throw new ServerError(SERR_APP_SERVER_BEFORE, e.message); } //Проверяем структуру ответа функции предобработки if (resBefore) { let sCheckResult = validateObject( resBefore, objInQueueSchema.InQueueProcessorFnBefore, "Результат функции предобработки входящего сообщения" ); //Если структура ответа в норме if (!sCheckResult) { //Выставим статус сообщению очереди - исполнено сервером приложений q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK }); //Фиксируем успех исполнения if (resBefore.blMsg) { q = await this.dbConn.setQueueAppSrvResult({ nQueueId: q.nId, blMsg: resBefore.blMsg, blResp: null }); } } else { //Или расскажем об ошибке throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } } //Вызываем обработчик со стороны БД (если он есть) if (prms.function.sPrcResp) { //Фиксируем начало исполнения сервером БД - в статусе сообщения q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB }); //Вызов обработчика БД q = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId }); //Выставим статус сообщению очереди - исполнено обработчиком БД q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK }); } //Выполняем обработчик "После" (если он есть) if (prms.function.sAppSrvAfter) { //Выставим статус сообщению очереди - исполняется сервером приложений q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP }); //Выполняем const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter); let resAfter = null; try { prms.queue = q; resAfter = await fnAfter(prms); } catch (e) { throw new ServerError(SERR_APP_SERVER_AFTER, e.message); } //Проверяем структуру ответа функции предобработки if (resAfter) { let sCheckResult = validateObject( resAfter, objInQueueSchema.InQueueProcessorFnAfter, "Результат функции постобработки входящего сообщения" ); //Если структура ответа в норме if (!sCheckResult) { //Выставим статус сообщению очереди - исполнено сервером приложений q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK }); //Фиксируем успех исполнения q = await this.dbConn.setQueueAppSrvResult({ nQueueId: q.nId, blMsg: q.blMsg, blResp: resAfter.blResp }); } else { //Или расскажем об ошибке throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } } //Всё успешно - отдаём результат клиенту prms.res.status(200).send(q.blResp); //Фиксируем успех обработки - в статусе сообщения q = await this.dbConn.setQueueState({ nQueueId: q.nId, nIncExecCnt: NINC_EXEC_CNT_YES, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK }); //Фиксируем успех обработки - в протоколе работы сервиса await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId }); } catch (e) { //Если сообщение очереди успели создать if (q) { //Фиксируем ошибку обработки сервером приложений - в статусе сообщения q = await this.dbConn.setQueueState({ nQueueId: q.nId, sExecMsg: makeErrorText(e), nIncExecCnt: NINC_EXEC_CNT_YES, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR }); //Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса await this.logger.error( `Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${makeErrorText(e)}`, { nQueueId: q.nId } ); } else { //Ограничимся общей ошибкой await this.logger.error(makeErrorText(e), { nServiceId: prms.service.nId, nServiceFnId: prms.function.nId }); } //Отправим ошибку клиенту prms.res.status(500).send(makeErrorText(e)); } } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } //Запуск обработки очереди входящих сообщений startProcessing(prms) { //Проверяем структуру переданного объекта для старта let sCheckResult = validateObject( prms, prmsInQueueSchema.startProcessing, "Параметры функции запуска обработки очереди входящих сообщений" ); //Если структура объекта в норме if (!sCheckResult) { //Выставляем флаг работы this.bWorking = true; //запоминаем список обслуживаемых сервисов this.services = prms.services; //Конфигурируем сервер - обработка тела сообщения this.webApp.use(bodyParser.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" })); //Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений _.forEach(_.filter(this.services, { nSrvType: objServiceSchema.NSRV_TYPE_RECIVE }), srvs => { //Для любых запросов к корневому адресу сервиса - ответ о том, что это за сервис, и что он работает this.webApp.all(srvs.sSrvRoot, (req, res) => { res.status(200).send( `