/* Сервис интеграции ПП Парус 8 с WEB API Модуль ядра: отработка очереди входящих сообщений */ //------------------------------ // Подключение внешних библиотек //------------------------------ const _ = require("lodash"); //Работа с массивами и коллекциями const EventEmitter = require("events"); //Обработчик пользовательских событий const express = require("express"); //WEB-сервер Express const cors = require("cors"); //Управление заголовками безопасности для WEB-сервера Express const bodyParser = require("body-parser"); //Модуль для Express (разбор тела входящего запроса) const { ServerError } = require("./server_errors"); //Типовая ошибка const { makeErrorText, validateObject, buildURL, getAppSrvFunction, buildOptionsXML, parseOptionsXML, deepMerge, deepCopyObject, isUndefined, getKafkaConnectionSettings, getMQTTConnectionSettings, getURLProtocol } = require("./utils"); //Вспомогательные функции const { NINC_EXEC_CNT_YES, NIS_ORIGINAL_NO, NIS_ORIGINAL_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД const objInQueueSchema = require("../models/obj_in_queue"); //Схема валидации сообщений обмена с бработчиком очереди входящих сообщений const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервиса const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди const prmsInQueueSchema = require("../models/prms_in_queue"); //Схемы валидации параметров функций класса const { SERR_OBJECT_BAD_INTERFACE, SERR_WEB_SERVER, SERR_APP_SERVER_BEFORE, SERR_APP_SERVER_AFTER, SERR_DB_SERVER, SERR_UNAUTH } = require("./constants"); //Общесистемные константы const { subscribeMQTT } = require("./mqtt_connector"); //Модуль для работы с MQTT const { subscribeKafka } = require("./kafka_connector"); //Модуль для работы с Kafka //-------------------------- // Глобальные идентификаторы //-------------------------- //Типовые события const SEVT_IN_QUEUE_STARTED = "IN_QUEUE_STARTED"; //Обработчик очереди запущен const SEVT_IN_QUEUE_STOPPED = "IN_QUEUE_STOPPED"; //Обработчик очереди остановлен //------------ // Тело модуля //------------ //Класс очереди входящих сообщений class InQueue extends EventEmitter { //Конструктор класса constructor(prms) { //Создадим экземпляр родительского класса super(); //Проверяем структуру переданного объекта для подключения let sCheckResult = validateObject(prms, prmsInQueueSchema.InQueue, "Параметры конструктора класса InQueue"); //Если структура объекта в норме if (!sCheckResult) { //Общие параметры сервера приложений this.common = _.cloneDeep(prms.common); //Список обслуживаемых сервисов this.services = null; //Признак функционирования обработчика this.bWorking = false; //Параметры очереди this.inComing = _.cloneDeep(prms.inComing); //Запомним подключение к БД this.dbConn = prms.dbConn; //Запомним логгер this.logger = prms.logger; //Запомним уведомитель this.notifier = prms.notifier; //WEB-приложение this.webApp = express(); this.webApp.use(cors()); this.webApp.options("*", cors()); //WEB-сервер this.srv = null; //Параметры подключения к Kafka this.kafka = prms.kafka; //Параметры подключения к MQTT this.mqtt = prms.mqtt; //Внешние подключения this.kafkaConnections = []; this.mqttConnections = []; } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } //Уведомление о запуске обработчика очереди notifyStarted() { //Оповестим подписчиков о запуске this.emit(SEVT_IN_QUEUE_STARTED, this.inComing.nPort, this.inComing.sHost); } //Уведомление об остановке обработчика очереди notifyStopped() { //Оповестим подписчиков об останове this.emit(SEVT_IN_QUEUE_STOPPED); } //Обработка сообщения HTTP/HTTPS async processMessage(prms) { //Проверяем структуру переданного объекта для обработки let sCheckResult = validateObject(prms, prmsInQueueSchema.processMessage, "Параметры функции обработки входящего сообщения"); //Если структура объекта в норме if (!sCheckResult) { //Буфер для сообщения очереди let q = null; try { //Тело сообщения и ответ на него let blMsg = null; let blResp = null; //Параметры сообщения и ответа на него let options = {}; let optionsResp = {}; //Флаг прекращения обработки сообщения let bStopPropagation = false; //Определимся с телом сообщения - для POST, PATCH и PUT сообщений - это тело запроса if ( [objServiceFnSchema.NFN_PRMS_TYPE_POST, objServiceFnSchema.NFN_PRMS_TYPE_PATCH, objServiceFnSchema.NFN_PRMS_TYPE_PUT].includes( prms.function.nFnPrmsType ) ) { blMsg = prms.req.body && !_.isEmpty(prms.req.body) ? prms.req.body : null; } else { //Для GET, HEAD, DELETE, CONNECT, OPTIONS и TRACE - параметры запроса if (!_.isEmpty(prms.req.query)) blMsg = Buffer.from(JSON.stringify(prms.req.query)); } //Определимся с параметрами сообщения полученными от внешней системы options = { method: prms.req.method, qs: _.cloneDeep(prms.req.query), headers: _.cloneDeep(prms.req.headers), ip: prms.req.ip, hostName: prms.req.hostname, protocol: prms.req.protocol, originalUrl: prms.req.originalUrl, path: prms.req.path }; //Кладём сообщение в очередь q = await this.dbConn.putQueue({ nServiceFnId: prms.function.nId, sOptions: buildOptionsXML({ options }), blMsg }); //Скажем что пришло новое входящее сообщение await this.logger.info( `Новое входящее сообщение от ${prms.req.connection.address().address} для функции ${prms.function.sCode} (${buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL })})`, { nQueueId: q.nId } ); //Выполняем обработчик "До" (если он есть) if (prms.function.sAppSrvBefore) { //Выставим статус сообщению очереди - исполняется сервером приложений q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP }); //Выполняем const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore); let resBefore = null; try { let resBeforePrms = _.cloneDeep(prms); resBeforePrms.queue = _.cloneDeep(q); resBeforePrms.queue.blMsg = blMsg; resBeforePrms.queue.blResp = blResp; resBeforePrms.options = _.cloneDeep(options); resBeforePrms.dbConn = this.dbConn; resBeforePrms.notifier = this.notifier; resBeforePrms.res = prms.res; resBefore = await fnBefore(resBeforePrms); } catch (e) { throw new ServerError(SERR_APP_SERVER_BEFORE, e.message); } //Проверяем структуру ответа функции предобработки if (resBefore) { let sCheckResult = validateObject( resBefore, objInQueueSchema.InQueueProcessorFnBefore, "Результат функции предобработки входящего сообщения" ); //Если структура ответа в норме if (!sCheckResult) { //Выставим статус сообщению очереди - исполнено сервером приложений q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK }); //Фиксируем результат исполнения "До" - обработанный запрос внешней системы if (!_.isUndefined(resBefore.blMsg)) { blMsg = resBefore.blMsg; q = await this.dbConn.setQueueMsg({ nQueueId: q.nId, blMsg }); } //Фиксируем результат исполнения "До" - ответ на запрос if (!_.isUndefined(resBefore.blResp)) { blResp = resBefore.blResp; q = await this.dbConn.setQueueResp({ nQueueId: q.nId, blResp, nIsOriginal: NIS_ORIGINAL_YES }); } //Фиксируем результат исполнения "До" - параметры ответа на запрос if (!_.isUndefined(resBefore.optionsResp)) { optionsResp = deepMerge(optionsResp, resBefore.optionsResp); let sOptionsResp = buildOptionsXML({ options: optionsResp }); q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp }); } //Фиксируем результат исполнения "До" - флаг ошибочной аутентификации - если он поднят, то это ошибка, дальше ничего не делаем if (!_.isUndefined(resBefore.bUnAuth) && resBefore.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации"); //Фиксируем результат исполнения "До" - флаг прекращения дальнейшей обработки сообщения - если он поднят, то дальше обработку сообщения прекращаем if (!_.isUndefined(resBefore.bStopPropagation) && resBefore.bStopPropagation === true) bStopPropagation = true; } else { //Или расскажем об ошибке throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } } //Вызываем обработчик со стороны БД (если он есть) if (bStopPropagation === false && prms.function.sPrcResp) { //Фиксируем начало исполнения сервером БД - в статусе сообщения q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB }); //Вызов обработчика БД let prcRes = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId }); //Если результат - ошибка пробрасываем её if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg); //Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации"); //Выставим статус сообщению очереди - исполнено обработчиком БД q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK }); //Считаем ответ полученный от системы let qData = await this.dbConn.getQueueResp({ nQueueId: q.nId }); blResp = qData.blResp; //Запомним параметры ответа внешней системе, если обработчик их вернул if (prcRes.sOptionsResp) { try { let optionsRespTmp = await parseOptionsXML({ sOptions: prcRes.sOptionsResp }); optionsResp = deepMerge(optionsResp, optionsRespTmp); } catch (e) { await logger.warn( `Указанные для сообщения параметры ответа имеют некорректный формат - использую параметры по умолчанию. Ошибка парсера: ${makeErrorText( e )}`, { nQueueId: prms.queue.nId } ); } } } //Выполняем обработчик "После" (если он есть) if (bStopPropagation === false && prms.function.sAppSrvAfter) { //Выставим статус сообщению очереди - исполняется сервером приложений q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP }); //Выполняем const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter); let resAfter = null; try { let resAfterPrms = _.cloneDeep(prms); resAfterPrms.queue = _.cloneDeep(q); resAfterPrms.queue.blMsg = blMsg; resAfterPrms.queue.blResp = blResp; resAfterPrms.options = _.cloneDeep(options); resAfterPrms.optionsResp = _.cloneDeep(optionsResp); resAfterPrms.dbConn = this.dbConn; resAfterPrms.notifier = this.notifier; resAfter = await fnAfter(resAfterPrms); } catch (e) { throw new ServerError(SERR_APP_SERVER_AFTER, e.message); } //Проверяем структуру ответа функции предобработки if (resAfter) { let sCheckResult = validateObject( resAfter, objInQueueSchema.InQueueProcessorFnAfter, "Результат функции постобработки входящего сообщения" ); //Если структура ответа в норме if (!sCheckResult) { //Выставим статус сообщению очереди - исполнено сервером приложений q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK }); //Фиксируем результат исполнения "После" - ответ системы if (!_.isUndefined(resAfter.blResp)) { blResp = resAfter.blResp; q = await this.dbConn.setQueueResp({ nQueueId: q.nId, blResp, nIsOriginal: NIS_ORIGINAL_NO }); } //Фиксируем результат исполнения "После" - параметры ответа на запрос if (!_.isUndefined(resAfter.optionsResp)) { optionsResp = deepMerge(optionsResp, resAfter.optionsResp); let sOptionsResp = buildOptionsXML({ options: optionsResp }); q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp }); } //Если пришел флаг ошибочной аутентификации и он положительный - то это ошибка, дальше ничего не делаем if (!_.isUndefined(resAfter.bUnAuth)) if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации"); } else { //Или расскажем об ошибке throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } } //Всё успешно - отдаём результат клиенту if (bStopPropagation === false) { if (optionsResp.headers) prms.res.set(optionsResp.headers); prms.res.status(optionsResp.statusCode || 200).send(blResp); } //Фиксируем успех обработки - в протоколе работы сервиса await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId }); //Фиксируем успех обработки - в статусе сообщения q = await this.dbConn.setQueueState({ nQueueId: q.nId, nIncExecCnt: NINC_EXEC_CNT_YES, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK }); } catch (e) { //Тема и текст уведомления об ошибке let sSubject = `Ошибка обработки входящего сообщения сервером приложений для функции "${prms.function.sCode}" сервиса "${prms.service.sCode}"`; let sMessage = makeErrorText(e); //Если сообщение очереди успели создать if (q) { //Фиксируем ошибку обработки сервером приложений - в статусе сообщения q = await this.dbConn.setQueueState({ nQueueId: q.nId, sExecMsg: sMessage, nIncExecCnt: NINC_EXEC_CNT_YES, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR }); //Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса await this.logger.error(`Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${sMessage}`, { nQueueId: q.nId }); //Добавим чуть больше информации в тему сообщения sSubject = `Ошибка обработки входящего сообщения ${q.nId} сервером приложений для функции "${prms.function.sCode}" сервиса "${prms.service.sCode}"`; } else { //Ограничимся общей ошибкой await this.logger.error(sMessage, { nServiceId: prms.service.nId, nServiceFnId: prms.function.nId }); } //Если для функции-обработчика указан признак необходимости оповещения об ошибках if (prms.function.nErrNtfSign == objServiceFnSchema.NERR_NTF_SIGN_YES) { //Отправим уведомление об ошибке отработки в почту await this.notifier.addMessage({ sTo: prms.function.sErrNtfMail, sSubject, sMessage }); } //Отправим ошибку клиенту prms.res.status(500).send(makeErrorText(e)); } } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } //Обработка MQ сообщения async processMQMessage({ message, service, fn, sProtocol }) { //Буфер для сообщения очереди let q = null; try { //Префикс протокола let sProtocolPrefix = sProtocol === objServiceSchema.SPROTOCOL_KAFKA ? "Kafka" : "MQTT"; //Тело сообщения let blMsg = null; //Параметры сообщения let options = {}; //Флаг прекращения обработки сообщения let bStopPropagation = false; //Получим тело сообщения blMsg = message.value ? message.value : null; //Определимся с параметрами сообщения полученными от внешней системы options = { method: fn.sFnPrmsType, headers: message.headers }; //Кладём сообщение в очередь q = await this.dbConn.putQueue({ nServiceFnId: fn.nId, sOptions: buildOptionsXML({ options }), blMsg }); //Скажем что пришло новое входящее сообщение await this.logger.info( `Новое входящее ${sProtocolPrefix}-сообщение для функции ${fn.sCode} (${buildURL({ sSrvRoot: service.sSrvRoot, sFnURL: fn.sFnURL })})`, { nQueueId: q.nId } ); //Выполняем обработчик "До" (если он есть) if (fn.sAppSrvBefore) { //Выставим статус сообщению очереди - исполняется сервером приложений q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP }); //Выполняем const fnBefore = getAppSrvFunction(fn.sAppSrvBefore); let resBefore = null; try { let resBeforePrms = { service: service, function: fn }; resBeforePrms.queue = deepCopyObject(q); resBeforePrms.queue.blMsg = blMsg; resBeforePrms.options = deepCopyObject(options); resBeforePrms.dbConn = this.dbConn; resBeforePrms.notifier = this.notifier; resBefore = await fnBefore(resBeforePrms); } catch (e) { throw new ServerError(SERR_APP_SERVER_BEFORE, e.message); } //Проверяем структуру ответа функции предобработки if (resBefore) { let sCheckResult = validateObject( resBefore, objInQueueSchema.InQueueProcessorFnBefore, "Результат функции предобработки входящего сообщения" ); //Если структура ответа в норме if (!sCheckResult) { //Выставим статус сообщению очереди - исполнено сервером приложений q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK }); //Фиксируем результат исполнения "До" - обработанный запрос внешней системы if (!isUndefined(resBefore.blMsg)) { blMsg = resBefore.blMsg; q = await this.dbConn.setQueueMsg({ nQueueId: q.nId, blMsg }); } //Фиксируем результат исполнения "До" - флаг прекращения дальнейшей обработки сообщения - если он поднят, то дальше обработку сообщения прекращаем if (!isUndefined(resBefore.bStopPropagation) && resBefore.bStopPropagation === true) bStopPropagation = true; } else { //Или расскажем об ошибке throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } } //Вызываем обработчик со стороны БД (если он есть) if (bStopPropagation === false && fn.sPrcResp) { //Фиксируем начало исполнения сервером БД - в статусе сообщения q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB }); //Вызов обработчика БД let prcRes = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId }); //Если результат - ошибка пробрасываем её if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg); //Выставим статус сообщению очереди - исполнено обработчиком БД q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK }); } //Фиксируем успех обработки - в протоколе работы сервиса await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId }); //Фиксируем успех обработки - в статусе сообщения q = await this.dbConn.setQueueState({ nQueueId: q.nId, nIncExecCnt: NINC_EXEC_CNT_YES, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK }); } catch (e) { //Тема и текст уведомления об ошибке let sSubject = `Ошибка обработки входящего сообщения сервером приложений для функции "${fn.sCode}" сервиса "${service.sCode}"`; let sMessage = makeErrorText(e); //Если сообщение очереди успели создать if (q) { //Фиксируем ошибку обработки сервером приложений - в статусе сообщения q = await this.dbConn.setQueueState({ nQueueId: q.nId, sExecMsg: sMessage, nIncExecCnt: NINC_EXEC_CNT_YES, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR }); //Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса await this.logger.error(`Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${sMessage}`, { nQueueId: q.nId }); //Добавим чуть больше информации в тему сообщения sSubject = `Ошибка обработки входящего сообщения ${q.nId} сервером приложений для функции "${fn.sCode}" сервиса "${service.sCode}"`; } else { //Ограничимся общей ошибкой await this.logger.error(sMessage, { nServiceId: service.nId, nServiceFnId: fn.nId }); } //Если для функции-обработчика указан признак необходимости оповещения об ошибках if (fn.nErrNtfSign == objServiceFnSchema.NERR_NTF_SIGN_YES) { //Отправим уведомление об ошибке отработки в почту await this.notifier.addMessage({ sTo: fn.sErrNtfMail, sSubject, sMessage }); } } } //Запуск обработки очереди входящих сообщений async startProcessing(prms) { //Проверяем структуру переданного объекта для старта let sCheckResult = validateObject(prms, prmsInQueueSchema.startProcessing, "Параметры функции запуска обработки очереди входящих сообщений"); //Если структура объекта в норме if (!sCheckResult) { //Выставляем флаг работы this.bWorking = true; //запоминаем список обслуживаемых сервисов this.services = prms.services; //Конфигурируем сервер - установка MIME-типа входного сообщения по умолчанию this.webApp.use((req, res, next) => { req.headers["content-type"] = req.headers["content-type"] || "application/octet-stream"; if (req.headers["content-type"] === "false") req.headers["content-type"] = "application/octet-stream"; next(); }); //Конфигурируем сервер - обработка тела сообщения this.webApp.use(bodyParser.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" })); //Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений _.forEach( _.filter(this.services, srv => { return ( srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE && [objServiceSchema.SPROTOCOL_HTTP, objServiceSchema.SPROTOCOL_HTTPS].includes(getURLProtocol(srv.sSrvRoot)) ); }), srvs => { //Для любых запросов к корневому адресу сервиса - ответ о том, что это за сервис, и что он работает this.webApp.all(srvs.sSrvRoot, (req, res) => { res.status(200).send( `

Сервер приложений ПП Парус 8
(${this.common.sVersion} релиз ${this.common.sRelease})

Сервис: ${srvs.sName}

` ); }); //Для всех статических функций сервиса... _.forEach( _.filter(srvs.functions, fn => fn.sFnURL.startsWith("@")), fn => { this.webApp.use( buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL.substr(1) }), express.static(`${this.inComing.sStaticDir}/${fn.sFnURL.substr(1)}`) ); } ); //Для всех функций сервиса (кроме статических)... _.forEach( _.filter(srvs.functions, fn => !fn.sFnURL.startsWith("@")), fn => { //...собственный обработчик, в зависимости от указанного способа передачи параметров this.webApp[fn.sFnPrmsType.toLowerCase()](buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), async (req, res) => { try { //Вызываем обработчик await this.processMessage({ req, res, service: srvs, function: fn }); } catch (e) { //Протоколируем в журнал работы сервера await this.logger.error(makeErrorText(e), { nServiceId: srvs.nId, nServiceFnId: fn.nId }); //Отправим ошибку клиенту res.status(500).send(makeErrorText(e)); } }); //...и собственный обработчик ошибок this.webApp.use(buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), async (err, req, res, next) => { //Протоколируем в журнал работы сервера await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)), { nServiceId: srvs.nId, nServiceFnId: fn.nId }); //Отправим ошибку клиенту res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message))); }); } ); } ); //Инициализируем настройки подключения let connectionSettings = null; //Считываем прием сообщений по Kafka let kafkaSrvs = this.services.filter(srv => { return srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE && getURLProtocol(srv.sSrvRoot) === objServiceSchema.SPROTOCOL_KAFKA; }); //Если есть сервисы с приемом сообщений по Kafka if (kafkaSrvs.length !== 0) { //Обходим данные сервисы for (let srv of kafkaSrvs) { //Если у сервиса обмена есть функции if (srv.functions.length !== 0) { //Считываем настройки подключения к Kafka connectionSettings = getKafkaConnectionSettings(srv.sCode, this.kafka); //Если настройки подключения считаны if (connectionSettings) { //Подключаемся и подписываемся на соответствующий брокер let connectionKafka = await subscribeKafka({ settings: connectionSettings, service: srv, processMessage: prms => this.processMQMessage({ ...prms, sProtocol: objServiceSchema.SPROTOCOL_KAFKA }), logger: this.logger }); //Если подключение было создано if (connectionKafka) { //Добавляем в общий список подключений kafka this.kafkaConnections.push(connectionKafka); } } else { await this.logger.error( `Ошибка получения настроек подключения к Kafka для сервиса "${srv.sCode}". Необходимо проверить соответствующий параметр ("kafka") файла конфигурации сервиса приложений ("config.js").` ); } } } } //Считываем прием сообщений по MQTT let mqttSrvs = this.services.filter(srv => { return ( srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE && [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(getURLProtocol(srv.sSrvRoot)) ); }); //Если есть сервисы с приемом сообщений по MQTT if (mqttSrvs.length !== 0) { //Обходим данные сервисы for (let srv of mqttSrvs) { //Если у сервиса обмена есть функции if (srv.functions.length !== 0) { //Считываем настройки подключения к MQTT connectionSettings = getMQTTConnectionSettings(srv.sCode, this.mqtt); //Если настройки подключения считаны if (connectionSettings) { //Подключаемся и подписываемся на соответствующий брокер let connectionMQTT = await subscribeMQTT({ settings: connectionSettings, service: srv, processMessage: prms => this.processMQMessage({ ...prms, sProtocol: objServiceSchema.SPROTOCOL_MQTT }), logger: this.logger }); //Если подключение было создано if (connectionMQTT) { //Добавляем в общий список подключений kafka this.mqttConnections.push(connectionMQTT); } } else { await this.logger.error( `Ошибка получения настроек подключения к MQTT для сервиса "${srv.sCode}". Необходимо проверить соответствующий параметр ("mqtt") файла конфигурации сервиса приложений ("config.js").` ); } } } } //Запросы на адреса, не входящие в состав объявленных сервисов - 404 NOT FOUND this.webApp.use("*", (req, res) => { res.status(404).send( `

Сервер приложений ПП Парус 8
(${this.common.sVersion} релиз ${this.common.sRelease})

Запрошенный адрес не найден

` ); }); //Ошибки, не отработанные индивидуальными обработчиками - 500 SERVER ERROR this.webApp.use(async (err, req, res, next) => { //Протоколируем в журнал работы сервера await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message))); //Отправим ошибку клиенту res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message))); }); //Запускаем сервер this.srv = this.webApp.listen(this.inComing.nPort, this.inComing.sHost, () => { //И оповещаем всех что запустились this.notifyStarted(); }); this.srv.on("error", e => { throw new ServerError(e.code, `Фатальная ошибка обработчика очереди входящих сообщений: ${e.message}`); }); } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } //Закрытие подключений stopConnections() { //Если у нас есть соединения с MQTT if (this.mqttConnections.length !== 0) { //Закрываем их for (let connection of this.mqttConnections) { try { connection.end(); } catch (e) { this.logger.error(`Ошибка завершения MQTT подключения: ${makeErrorText(e)}`); } } } //Если у нас есть соединения с Kafka if (this.kafkaConnections.length !== 0) { //Закрываем их for (let connection of this.kafkaConnections) { try { connection.disconnect(); } catch (e) { this.logger.error(`Ошибка завершения Kafka подключения: ${makeErrorText(e)}`); } } } } //Остановка обработки очереди исходящих сообщений stopProcessing() { //Выставляем флаг неработы this.bWorking = false; //Закрываем подключения, если они есть this.stopConnections(); //Останавливаем WEB-сервер (если создавался) if (this.srv) { this.srv.close(() => { //Оповещаем всхес, что остановились this.notifyStopped(); }); } else { //Сервер не создавался - просто оповещаем всех, что остановились this.notifyStopped(); } } } //----------------- // Интерфейс модуля //----------------- exports.SEVT_IN_QUEUE_STARTED = SEVT_IN_QUEUE_STARTED; exports.SEVT_IN_QUEUE_STOPPED = SEVT_IN_QUEUE_STOPPED; exports.InQueue = InQueue;