From 6ad825c2b6390672c9b15831f0a01d1d5f0db3b0 Mon Sep 17 00:00:00 2001 From: Dollerino Date: Tue, 24 Sep 2024 17:58:57 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A6=D0=98=D0=A2=D0=9A-901=20(=D0=94=D0=BE?= =?UTF-8?q?=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD=D0=B8=D0=B5=20=D0=BF=D0=BE?= =?UTF-8?q?=D0=B4=D0=B4=D0=B5=D1=80=D0=B6=D0=BA=D0=B8=20=D0=BF=D1=80=D0=BE?= =?UTF-8?q?=D1=82=D0=BE=D0=BA=D0=BE=D0=BB=D0=BE=D0=B2=20MQTT=20=D0=B8=20KA?= =?UTF-8?q?FKA)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.js | 66 ++++++------ config_default.js | 64 +++++++----- core/app.js | 10 +- core/in_queue.js | 185 +++++++++++++++++++-------------- core/kafka_connector.js | 41 ++++---- core/mqtt_connector.js | 32 +++--- core/out_queue.js | 8 +- core/out_queue_processor.js | 52 ++++++--- core/utils.js | 54 ++++++++-- models/obj_service.js | 45 ++++---- models/obj_service_function.js | 12 --- 11 files changed, 337 insertions(+), 232 deletions(-) diff --git a/config.js b/config.js index edea45f..c86ddc7 100644 --- a/config.js +++ b/config.js @@ -72,34 +72,42 @@ let inComing = { }; //Параметры подключения к Kafka -let kafkaConnection = { - //ID клиента-отправителя - sClientIdSender: "Parus", - //ID клиента-получателя - sClientIdRecipient: "Parus", - //Группа получателя - sGroupId: "Parus", - //Время ожидания успешного подключения (мс) - nConnectionTimeout: 5000, - //Необходимость попытки переподключения при потере соединения - bRestartOnFailure: false, - //Время максимального ожидания между попытками переподключения (мс) - nMaxRetryTime: 20000, - //Время ожидания между попытками переподключения (мс) - nInitialRetryTime: 10000 -}; +const kafka = [ + { + //Мнемокод сервиса обмена (пусто - использовать по умолчанию) + sService: "", + //ID клиента-отправителя + sClientIdSender: "Parus", + //ID клиента-получателя + sClientIdRecipient: "Parus", + //Группа получателя + sGroupId: "Parus", + //Время ожидания успешного подключения (мс) + nConnectionTimeout: 5000, + //Необходимость попытки переподключения при потере соединения + bRestartOnFailure: true, + //Время максимального ожидания между попытками переподключения (мс) + nMaxRetryTime: 20000, + //Время ожидания между попытками переподключения (мс) + nInitialRetryTime: 10000 + } +]; -//Параметры подключения по MQTT протоколу -let mqttConnection = { - //ID клиента-отправителя - sClientIdSender: "Parus", - //ID клиента-получателя (если равен sClientIdSender, то отправленные сообщения будут игнорироваться) - sClientIdRecipient: "ParusRecipient", - //Время ожидания успешного подключения (мс) - nConnectTimeout: 5000, - //Время ожидания между попытками переподключения (мс) - nReconnectPeriod: 10000 -}; +//Параметры подключения к MQTT +const mqtt = [ + { + //Мнемокод сервиса обмена (пусто - использовать по умолчанию) + sService: "", + //ID клиента-отправителя + sClientIdSender: "Parus", + //ID клиента-получателя (если равен sClientIdSender, то отправленные сообщения будут игнорироваться) + sClientIdRecipient: "ParusRecipient", + //Время ожидания успешного подключения (мс) + nConnectTimeout: 5000, + //Время ожидания между попытками переподключения (мс) + nReconnectPeriod: 10000 + } +]; //Параметры отправки E-Mail уведомлений let mail = { @@ -128,7 +136,7 @@ module.exports = { dbConnect, outGoing, inComing, - kafkaConnection, - mqttConnection, + kafka, + mqtt, mail }; diff --git a/config_default.js b/config_default.js index 1d5007f..c289e71 100644 --- a/config_default.js +++ b/config_default.js @@ -72,32 +72,42 @@ let inComing = { }; //Параметры подключения к Kafka -let kafkaConnection = { - //ID клиента-отправителя - sClientIdSender: "Parus", - //ID клиента-получателя - sClientIdRecipient: "Parus", - //Время ожидания успешного подключения (мс) - nConnectionTimeout: 5000, - //Необходимость попытки переподключения при потере соединения - bRestartOnFailure: false, - //Время максимального ожидания между попытками переподключения (мс) - nMaxRetryTime: 20000, - //Время ожидания между попытками переподключения (мс) - nInitialRetryTime: 10000 -}; +const kafka = [ + { + //Мнемокод сервиса обмена (пусто - использовать по умолчанию) + sService: "", + //ID клиента-отправителя + sClientIdSender: "Parus", + //ID клиента-получателя + sClientIdRecipient: "Parus", + //Группа получателя + sGroupId: "Parus", + //Время ожидания успешного подключения (мс) + nConnectionTimeout: 5000, + //Необходимость попытки переподключения при потере соединения + bRestartOnFailure: true, + //Время максимального ожидания между попытками переподключения (мс) + nMaxRetryTime: 20000, + //Время ожидания между попытками переподключения (мс) + nInitialRetryTime: 10000 + } +]; -//Параметры подключения по MQTT протоколу -let mqttConnection = { - //ID клиента-отправителя - sClientIdSender: "Parus", - //ID клиента-получателя (если равен sClientIdSender, то отправленные сообщения будут игнорироваться) - sClientIdRecipient: "ParusRecipient", - //Время ожидания успешного подключения (мс) - nConnectTimeout: 5000, - //Время ожидания между попытками переподключения (мс) - nReconnectPeriod: 10000 -}; +//Параметры подключения к MQTT +const mqtt = [ + { + //Мнемокод сервиса обмена (пусто - использовать по умолчанию) + sService: "", + //ID клиента-отправителя + sClientIdSender: "Parus", + //ID клиента-получателя (если равен sClientIdSender, то отправленные сообщения будут игнорироваться) + sClientIdRecipient: "ParusRecipient", + //Время ожидания успешного подключения (мс) + nConnectTimeout: 5000, + //Время ожидания между попытками переподключения (мс) + nReconnectPeriod: 10000 + } +]; //Параметры отправки E-Mail уведомлений let mail = { @@ -126,7 +136,7 @@ module.exports = { dbConnect, outGoing, inComing, - kafkaConnection, - mqttConnection, + kafka, + mqtt, mail }; diff --git a/core/app.js b/core/app.js index cf406d5..c6d096e 100644 --- a/core/app.js +++ b/core/app.js @@ -96,7 +96,7 @@ class ParusAppServer { //Запускаем обслуживание очереди входящих await this.logger.info("Запуск обработчика очереди входящих сообщений..."); try { - this.inQ.startProcessing({ services: this.services }); + await this.inQ.startProcessing({ services: this.services }); } catch (e) { await this.logger.error(`Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`); await this.stop(); @@ -218,8 +218,8 @@ class ParusAppServer { logger: this.logger, notifier: this.notifier, sProxy: prms.config.outGoing.sProxy, - kafkaConnectionPrms: prms.config.kafkaConnection, - mqttConnectionPrms: prms.config.mqttConnection + kafka: prms.config.kafka, + mqtt: prms.config.mqtt }); //Создаём обработчик очереди входящих this.inQ = new iq.InQueue({ @@ -228,8 +228,8 @@ class ParusAppServer { dbConn: this.dbConn, logger: this.logger, notifier: this.notifier, - kafkaConnectionPrms: prms.config.kafkaConnection, - mqttConnectionPrms: prms.config.mqttConnection + kafka: prms.config.kafka, + mqtt: prms.config.mqtt }); //Создаём контроллер доступности удалённых сервисов this.srvAvlCtrl = new sac.ServiceAvailableController({ diff --git a/core/in_queue.js b/core/in_queue.js index 316ecc3..ce20127 100644 --- a/core/in_queue.js +++ b/core/in_queue.js @@ -21,7 +21,9 @@ const { buildOptionsXML, parseOptionsXML, deepMerge, - getKafkaBroker + getKafkaConnectionSettings, + getMQTTConnectionSettings, + getURLProtocol } = require("./utils"); //Вспомогательные функции const { NINC_EXEC_CNT_YES, NIS_ORIGINAL_NO, NIS_ORIGINAL_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД const objInQueueSchema = require("../models/obj_in_queue"); //Схема валидации сообщений обмена с бработчиком очереди входящих сообщений @@ -83,9 +85,9 @@ class InQueue extends EventEmitter { //WEB-сервер this.srv = null; //Параметры подключения к Kafka - this.kafkaConnectionPrms = _.cloneDeep(prms.kafkaConnectionPrms); + this.kafka = prms.kafka; //Параметры подключения к MQTT - this.mqttConnectionPrms = _.cloneDeep(prms.mqttConnectionPrms); + this.mqtt = prms.mqtt; //Внешние подключения this.kafkaConnections = []; this.mqttConnections = []; @@ -697,7 +699,7 @@ class InQueue extends EventEmitter { } //Запуск обработки очереди входящих сообщений - startProcessing(prms) { + async startProcessing(prms) { //Проверяем структуру переданного объекта для старта let sCheckResult = validateObject(prms, prmsInQueueSchema.startProcessing, "Параметры функции запуска обработки очереди входящих сообщений"); //Если структура объекта в норме @@ -715,106 +717,135 @@ class InQueue extends EventEmitter { //Конфигурируем сервер - обработка тела сообщения this.webApp.use(bodyParser.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" })); //Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений - _.forEach(_.filter(this.services, { nSrvType: objServiceSchema.NSRV_TYPE_RECIVE }), srvs => { - //Для любых запросов к корневому адресу сервиса - ответ о том, что это за сервис, и что он работает - this.webApp.all(srvs.sSrvRoot, (req, res) => { - res.status(200).send( - `

Сервер приложений ПП Парус 8
(${this.common.sVersion} релиз ${this.common.sRelease})

Сервис: ${srvs.sName}

` + _.forEach( + _.filter(this.services, srv => { + return ( + srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE && + [objServiceSchema.SPROTOCOL_HTTP, objServiceSchema.SPROTOCOL_HTTPS].includes(getURLProtocol(srv.sSrvRoot)) ); - }); - //Для всех статических функций сервиса... - _.forEach( - _.filter(srvs.functions, fn => fn.sFnURL.startsWith("@")), - fn => { - this.webApp.use( - buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL.substr(1) }), - express.static(`${this.inComing.sStaticDir}/${fn.sFnURL.substr(1)}`) + }), + srvs => { + //Для любых запросов к корневому адресу сервиса - ответ о том, что это за сервис, и что он работает + this.webApp.all(srvs.sSrvRoot, (req, res) => { + res.status(200).send( + `

Сервер приложений ПП Парус 8
(${this.common.sVersion} релиз ${this.common.sRelease})

Сервис: ${srvs.sName}

` ); - } - ); - //Для всех функций сервиса (кроме статических)... - _.forEach( - _.filter(srvs.functions, fn => !fn.sFnURL.startsWith("@")), - fn => { - //...собственный обработчик, в зависимости от указанного способа передачи параметров - this.webApp[fn.sFnPrmsType.toLowerCase()](buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), async (req, res) => { - try { - //Вызываем обработчик - await this.processMessage({ req, res, service: srvs, function: fn }); - } catch (e) { + }); + //Для всех статических функций сервиса... + _.forEach( + _.filter(srvs.functions, fn => fn.sFnURL.startsWith("@")), + fn => { + this.webApp.use( + buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL.substr(1) }), + express.static(`${this.inComing.sStaticDir}/${fn.sFnURL.substr(1)}`) + ); + } + ); + //Для всех функций сервиса (кроме статических)... + _.forEach( + _.filter(srvs.functions, fn => !fn.sFnURL.startsWith("@")), + fn => { + //...собственный обработчик, в зависимости от указанного способа передачи параметров + this.webApp[fn.sFnPrmsType.toLowerCase()](buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), async (req, res) => { + try { + //Вызываем обработчик + await this.processMessage({ req, res, service: srvs, function: fn }); + } catch (e) { + //Протоколируем в журнал работы сервера + await this.logger.error(makeErrorText(e), { + nServiceId: srvs.nId, + nServiceFnId: fn.nId + }); + //Отправим ошибку клиенту + res.status(500).send(makeErrorText(e)); + } + }); + //...и собственный обработчик ошибок + this.webApp.use(buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), async (err, req, res, next) => { //Протоколируем в журнал работы сервера - await this.logger.error(makeErrorText(e), { + await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)), { nServiceId: srvs.nId, nServiceFnId: fn.nId }); //Отправим ошибку клиенту - res.status(500).send(makeErrorText(e)); - } - }); - //...и собственный обработчик ошибок - this.webApp.use(buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), async (err, req, res, next) => { - //Протоколируем в журнал работы сервера - await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)), { - nServiceId: srvs.nId, - nServiceFnId: fn.nId + res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message))); }); - //Отправим ошибку клиенту - res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message))); - }); - } - ); - }); + } + ); + } + ); + //Инициализируем настройки подключения + let connectionSettings = null; //Считываем прием сообщений по Kafka - let kafkaSrvs = _.filter(this.services, srv => { - return srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE && srv.sSrvRoot.startsWith("kafka://"); + let kafkaSrvs = this.services.filter(srv => { + return srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE && getURLProtocol(srv.sSrvRoot) === objServiceSchema.SPROTOCOL_KAFKA; }); //Если есть сервисы с приемом сообщений по Kafka if (kafkaSrvs.length !== 0) { //Обходим данные сервисы - _.forEach(kafkaSrvs, async srvs => { + for (let srv of kafkaSrvs) { //Если у сервиса обмена есть функции - if (srvs.functions.length !== 0) { - //Подключаемся и подписываемся на соответствующий брокер - let connectionKafka = await subscribeKafka({ - connectionPrms: this.kafkaConnectionPrms, - service: srvs, - processKafkaMessage: prms => this.processKafkaMessage(prms), - logger: this.logger - }); - //Если подключение было создано - if (connectionKafka) { - //Добавляем в общий список подключений kafka - this.kafkaConnections.push(connectionKafka); + if (srv.functions.length !== 0) { + //Считываем настройки подключения к Kafka + connectionSettings = getKafkaConnectionSettings(srv.sCode, this.kafka); + //Если настройки подключения считаны + if (connectionSettings) { + //Подключаемся и подписываемся на соответствующий брокер + let connectionKafka = await subscribeKafka({ + settings: connectionSettings, + service: srv, + processKafkaMessage: prms => this.processKafkaMessage(prms), + logger: this.logger + }); + //Если подключение было создано + if (connectionKafka) { + //Добавляем в общий список подключений kafka + this.kafkaConnections.push(connectionKafka); + } + } else { + await this.logger.error( + `Ошибка получения настроек подключения к Kafka для сервиса "${srv.sCode}". Необходимо проверить соответствующий параметр ("kafka") файла конфигурации сервиса приложений ("config.js").` + ); } } - }); + } } //Считываем прием сообщений по MQTT - let mqttSrvs = _.filter(this.services, srv => { + let mqttSrvs = this.services.filter(srv => { return ( - srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE && (srv.sSrvRoot.startsWith("mqtt://") || srv.sSrvRoot.startsWith("mqtts://")) + srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE && + [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(getURLProtocol(srv.sSrvRoot)) ); }); //Если есть сервисы с приемом сообщений по MQTT if (mqttSrvs.length !== 0) { //Обходим данные сервисы - _.forEach(mqttSrvs, async srvs => { + for (let srv of mqttSrvs) { //Если у сервиса обмена есть функции - if (srvs.functions.length !== 0) { - //Подключаемся и подписываемся на соответствующий брокер - let connectionMQTT = await subscribeMQTT({ - connectionPrms: this.mqttConnectionPrms, - service: srvs, - processMQTTMessage: prms => this.processMQTTMessage(prms), - logger: this.logger - }); - //Если подключение было создано - if (connectionMQTT) { - //Добавляем в общий список подключений kafka - this.mqttConnections.push(connectionMQTT); + if (srv.functions.length !== 0) { + //Считываем настройки подключения к MQTT + connectionSettings = getMQTTConnectionSettings(srv.sCode, this.mqtt); + //Если настройки подключения считаны + if (connectionSettings) { + //Подключаемся и подписываемся на соответствующий брокер + let connectionMQTT = await subscribeMQTT({ + settings: connectionSettings, + service: srv, + processMQTTMessage: prms => this.processMQTTMessage(prms), + logger: this.logger + }); + //Если подключение было создано + if (connectionMQTT) { + //Добавляем в общий список подключений kafka + this.mqttConnections.push(connectionMQTT); + } + } else { + await this.logger.error( + `Ошибка получения настроек подключения к MQTT для сервиса "${srv.sCode}". Необходимо проверить соответствующий параметр ("mqtt") файла конфигурации сервиса приложений ("config.js").` + ); } } - }); + } } //Запросы на адреса, не входящие в состав объявленных сервисов - 404 NOT FOUND this.webApp.use("*", (req, res) => { diff --git a/core/kafka_connector.js b/core/kafka_connector.js index be09621..5504fa6 100644 --- a/core/kafka_connector.js +++ b/core/kafka_connector.js @@ -7,7 +7,6 @@ // Подключение библиотек //---------------------- -const _ = require("lodash"); //Работа с массивами и коллекциями const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции const { Kafka, logLevel } = require("kafkajs"); //Работа с Kafka @@ -16,12 +15,12 @@ const { Kafka, logLevel } = require("kafkajs"); //Работа с Kafka //------------ //Отправка сообщения Kafka -const publishKafka = async ({ connectionPrms, url, auth, topic, message }) => { +const publishKafka = async ({ settings, url, auth, topic, message }) => { //Иницализируем подключение к Kafka let kafka = new Kafka({ - clientId: connectionPrms.sClientIdSender, + clientId: settings.sClientIdSender, brokers: [url], - connectionTimeout: connectionPrms.nConnectionTimeout, + connectionTimeout: settings.nConnectionTimeout, logLevel: logLevel.NOTHING, ...auth }); @@ -30,15 +29,15 @@ const publishKafka = async ({ connectionPrms, url, auth, topic, message }) => { //Подключаемся к Kafka await producer.connect(); //Отправляем сообщение - let res = await producer.send({ topic: topic, messages: [{ value: message }] }); + await producer.send({ topic: topic, messages: [{ value: message }] }); //Отключаемся await producer.disconnect(); - //Возвращаем ответ - return res; + //Возвращаем статус успешной отправки + return { statusCode: 200 }; }; //Получение MQTT сообщений -const subscribeKafka = async ({ connectionPrms, service, processKafkaMessage, logger }) => { +const subscribeKafka = async ({ settings, service, processKafkaMessage, logger }) => { try { //Признак необходимости вывода сообщения о потере соединения let bLogLostConnection = true; @@ -46,15 +45,15 @@ const subscribeKafka = async ({ connectionPrms, service, processKafkaMessage, lo let sBroker = getKafkaBroker(service.sSrvRoot); //Иницализируем подключение к Kafka let client = new Kafka({ - clientId: connectionPrms.sClientIdRecipient, + clientId: settings.sClientIdRecipient, brokers: [sBroker], - connectionTimeout: connectionPrms.nConnectionTimeout, + connectionTimeout: settings.nConnectionTimeout, ...getKafkaAuth(service.sSrvUser, service.sSrvPass), logLevel: logLevel.NOTHING, retry: { retries: 0, - maxRetryTime: connectionPrms.nMaxRetryTime, - initialRetryTime: connectionPrms.nInitialRetryTime, + maxRetryTime: settings.nMaxRetryTime, + initialRetryTime: settings.nInitialRetryTime, restartOnFailure: error => { return new Promise(resolve => { //Если требуется вывести ошибку @@ -64,19 +63,20 @@ const subscribeKafka = async ({ connectionPrms, service, processKafkaMessage, lo //Сбрасываем признак необходимости вывода ошибки bLogLostConnection = false; } - resolve(connectionPrms.bRestartOnFailure); + resolve(settings.bRestartOnFailure); }); } } }); - //Инициализируем получателя - let consumer = client.consumer({ groupId: connectionPrms.sGroupId }); - + let consumer = client.consumer({ groupId: settings.sGroupId }); //Устанавливаем прослушивание await consumer.connect(); - consumer.subscribe({ topics: _.map(service.functions, "sFnURL") }); - + consumer.subscribe({ + topics: service.functions.map(fn => { + return fn.sFnURL; + }) + }); //Запускаем прослушивание необходимых топиков consumer.run({ eachMessage: async ({ topic, message }) => { @@ -85,14 +85,15 @@ const subscribeKafka = async ({ connectionPrms, service, processKafkaMessage, lo processKafkaMessage({ message, service, - fn: _.find(service.functions, { sFnURL: topic }) + fn: service.functions.find(fn => { + return fn.sFnURL === topic; + }) }); } catch (e) { await logger.error(`Ошибка обработки исходящего сообщения Kafka: ${makeErrorText(e)}`); } } }); - //Отслеживаем соединение consumer.on(consumer.events.CONNECT, () => { //Если сообщение о потере соединения уже выводилось diff --git a/core/mqtt_connector.js b/core/mqtt_connector.js index f63f226..17915f5 100644 --- a/core/mqtt_connector.js +++ b/core/mqtt_connector.js @@ -7,7 +7,6 @@ // Подключение библиотек //---------------------- -const _ = require("lodash"); //Работа с массивами и коллекциями const { makeErrorText } = require("./utils"); //Вспомогательные функции const mqtt = require("mqtt"); //Работа с MQTT @@ -16,48 +15,52 @@ const mqtt = require("mqtt"); //Работа с MQTT //------------ //Отправка MQTT сообщения -const publishMQTT = async ({ connectionPrms, url, auth, topic, message }) => { +const publishMQTT = async ({ settings, url, auth, topic, message }) => { //Инициализируем подключение const client = await mqtt.connectAsync(url, { - clientId: connectionPrms.sClientIdSender, + clientId: settings.sClientIdSender, clean: true, - connectTimeout: connectionPrms.nConnectTimeout, + connectTimeout: settings.nConnectTimeout, username: auth.user, password: auth.pass, - reconnectPeriod: connectionPrms.nReconnectPeriod + reconnectPeriod: settings.nReconnectPeriod }); //Отправляем сообщение await client.publishAsync(topic, message); //Закрываем подключение await client.endAsync(); - //Возвращаем сообщение, которое было отправлено + //Возвращаем статус успешной отправки return { statusCode: 200 }; }; //Получение MQTT сообщений -const subscribeMQTT = async ({ connectionPrms, service, processMQTTMessage, logger }) => { +const subscribeMQTT = async ({ settings, service, processMQTTMessage, logger }) => { try { //Инициализируем строку подключения let sBroker = service.sSrvRoot; //Инициализируем подключение const client = await mqtt.connectAsync(sBroker, { - clientId: connectionPrms.sClientIdRecipient, + clientId: settings.sClientIdRecipient, clean: true, - connectTimeout: connectionPrms.nConnectTimeout, + connectTimeout: settings.nConnectTimeout, username: service.sSrvUser, password: service.sSrvPass, - reconnectPeriod: connectionPrms.nReconnectPeriod + reconnectPeriod: settings.nReconnectPeriod }); - //Обходим функции сервиса - _.forEach(service.functions, fn => { + service.functions.forEach(fn => { client.subscribe(fn.sFnURL); }); - //Прослушиваем сообщения client.on("message", (topic, message) => { //Обрабатываем сообщение - processMQTTMessage({ message, service, fn: _.find(service.functions, { sFnURL: topic }) }); + processMQTTMessage({ + message, + service, + fn: service.functions.find(fn => { + return fn.sFnURL === topic; + }) + }); }); //Прослушиваем отключение от сервера client.on("offline", () => { @@ -69,7 +72,6 @@ const subscribeMQTT = async ({ connectionPrms, service, processMQTTMessage, logg //Сообщим о восстановлении соединения logger.info(`Соединение с MQTT восстановлено (${sBroker})`); }); - //Возвращаем подключение return client; } catch (e) { diff --git a/core/out_queue.js b/core/out_queue.js index c15acf7..fcab507 100644 --- a/core/out_queue.js +++ b/core/out_queue.js @@ -71,9 +71,9 @@ class OutQueue extends EventEmitter { //Привяжем методы к указателю на себя для использования в обработчиках событий this.outDetectingLoop = this.outDetectingLoop.bind(this); //Параметры подключения к Kafka - this.kafkaConnectionPrms = _.cloneDeep(prms.kafkaConnectionPrms); + this.kafka = prms.kafka; //Параметры подключения к MQTT - this.mqttConnectionPrms = _.cloneDeep(prms.mqttConnectionPrms); + this.mqtt = prms.mqtt; } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } @@ -164,8 +164,8 @@ class OutQueue extends EventEmitter { nId: prms.queue.nServiceFnId }), sProxy: this.sProxy, - kafkaConnectionPrms: this.kafkaConnectionPrms, - mqttConnectionPrms: this.mqttConnectionPrms + kafka: this.kafka, + mqtt: this.mqtt }); //Уменьшаем количество доступных обработчиков this.nWorkersLeft--; diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 867ac5f..1383fb1 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -20,6 +20,8 @@ const { parseOptionsXML, buildOptionsXML, deepMerge, + getKafkaConnectionSettings, + getMQTTConnectionSettings, getKafkaBroker, getKafkaAuth, getURLProtocol @@ -31,6 +33,7 @@ const objQueueSchema = require("../models/obj_queue"); //Схемы валида const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервиса const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса const { + SERR_UNEXPECTED, SERR_OBJECT_BAD_INTERFACE, SERR_APP_SERVER_BEFORE, SERR_APP_SERVER_AFTER, @@ -151,20 +154,38 @@ const appProcess = async prms => { //Исходя из протокола собираем параметры switch (true) { //Kafka - case sProtocol === objServiceFnSchema.SPROTOCOL_KAFKA: + case sProtocol === objServiceSchema.SPROTOCOL_KAFKA: options.url = getKafkaBroker(prms.service.sSrvRoot); options.body = prms.queue.blMsg; options.topic = prms.function.sFnURL; options.auth = getKafkaAuth(prms.service.sSrvUser, prms.service.sSrvPass); + options.settings = getKafkaConnectionSettings(prms.service.sCode, prms.kafka); + //Если параметры подключения не считаны + if (!options.settings) { + //Расскажем об ошибке считывания + throw new ServerError( + SERR_UNEXPECTED, + `Ошибка получения настроек подключения к Kafka для сервиса "${prms.service.sCode}". Необходимо проверить соответствующий параметр ("kafka") файла конфигурации сервиса приложений ("config.js").` + ); + } //Указываем, что выполнение обработчика "После" невозможно bExecuteAfter = false; break; //mqtt и mqtts - case [objServiceFnSchema.SPROTOCOL_MQTT, objServiceFnSchema.SPROTOCOL_MQTTS].includes(sProtocol): + case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol): options.url = prms.service.sSrvRoot; options.body = prms.queue.blMsg; options.topic = prms.function.sFnURL; options.auth = { user: prms.service.sSrvUser, pass: prms.service.sSrvPass }; + options.settings = getMQTTConnectionSettings(prms.service.sCode, prms.mqtt); + //Если параметры подключения не считаны + if (!options.settings) { + //Расскажем об ошибке считывания + throw new ServerError( + SERR_UNEXPECTED, + `Ошибка получения настроек подключения к MQTT для сервиса "${prms.service.sCode}". Необходимо проверить соответствующий параметр ("mqtt") файла конфигурации сервиса приложений ("config.js").` + ); + } //Указываем, что выполнение обработчика "После" невозможно bExecuteAfter = false; break; @@ -245,11 +266,9 @@ const appProcess = async prms => { objServiceFnSchema.NFN_PRMS_TYPE_PATCH, objServiceFnSchema.NFN_PRMS_TYPE_PUT ].includes(prms.function.nFnPrmsType) || - [ - objServiceFnSchema.SPROTOCOL_KAFKA, - objServiceFnSchema.SPROTOCOL_MQTT, - objServiceFnSchema.SPROTOCOL_MQTTS - ].includes(sProtocol) + [objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes( + sProtocol + ) ) { options.body = prms.queue.blMsg; } else { @@ -311,25 +330,25 @@ const appProcess = async prms => { nQueueId: prms.queue.nId }); } - //Ждем ответ от удалённого сервера - options.resolveWithFullResponse = true; + //Инициализируем ответ от сервера let serverResp = null; //Выполняем отправку исходя из протокола switch (true) { //Kafka - case sProtocol === objServiceFnSchema.SPROTOCOL_KAFKA: + case sProtocol === objServiceSchema.SPROTOCOL_KAFKA: serverResp = await publishKafka({ - connectionPrms: prms.kafkaConnectionPrms, + settings: options.settings, url: options.url, auth: options.auth, topic: options.topic, message: options.body }); + console.log(serverResp); break; //mqtt и mqtts - case [objServiceFnSchema.SPROTOCOL_MQTT, objServiceFnSchema.SPROTOCOL_MQTTS].includes(sProtocol): + case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol): serverResp = await publishMQTT({ - connectionPrms: prms.mqttConnectionPrms, + settings: options.settings, url: options.url, auth: options.auth, topic: options.topic, @@ -338,6 +357,9 @@ const appProcess = async prms => { break; //Другие default: + //Ждем ответ от удалённого сервера + options.resolveWithFullResponse = true; + //Отправляем запрос serverResp = await rqp(options); break; } @@ -593,8 +615,8 @@ const processTask = async prms => { service: prms.task.service, function: prms.task.function, sProxy: prms.task.sProxy, - kafkaConnectionPrms: prms.task.kafkaConnectionPrms, - mqttConnectionPrms: prms.task.mqttConnectionPrms + kafka: prms.task.kafka, + mqtt: prms.task.mqtt }); //Если результат обработки ошибка - пробрасываем её дальше if (res instanceof ServerError) { diff --git a/core/utils.js b/core/utils.js index 011023f..e4973d4 100644 --- a/core/utils.js +++ b/core/utils.js @@ -22,6 +22,7 @@ const { } = require("./constants"); //Глобавльные константы системы const { ServerError } = require("./server_errors"); //Ошибка сервера const prmsUtilsSchema = require("../models/prms_utils"); //Схемы валидации параметров функций +const { SPROTOCOL_HTTP, SPROTOCOL_KAFKA } = require("../models/obj_service"); //Схемы валидации сервиса //------------ // Тело модуля @@ -326,10 +327,49 @@ const deepMerge = (...args) => { return res; }; -//Получение брокера Kafka по адресу сервиса обмена +//Считывание параметров подключения для сервиса обмена (при service === "" считывание подключения "По умолчанию", settingsArray - массив объектов [{sService: "", ...},...]) +const getConnectionSettings = (service, settingsArray) => { + //Считываем параметры и возвращаем + return settingsArray.find(connection => { + return connection.sService === service; + }); +}; + +//Считывание параметров подключения к Kafka для сервиса обмена (kafka - массив объектов [{sService: "", ...},...]) +const getKafkaConnectionSettings = (service, kafka) => { + //Считываем подключение с указанным сервисом обмена + let kafkaConnection = getConnectionSettings(service, kafka); + //Если нет подключения с указанным сервисом обмена + if (!kafkaConnection) { + //Считываем "По умолчанию" + kafkaConnection = getConnectionSettings("", kafka); + } + //Вернем результат + return kafkaConnection; +}; + +//Считывание параметров подключения к MQTT для сервиса обмена (mqtt - массив объектов [{sService: "", ...},...]) +const getMQTTConnectionSettings = (service, mqtt) => { + //Считываем подключение с указанным сервисом обмена + let mqttConnection = getConnectionSettings(service, mqtt); + //Если нет подключения с указанным сервисом обмена + if (!mqttConnection) { + //Считываем "По умолчанию" + mqttConnection = getConnectionSettings("", mqtt); + } + //Вернем результат + return mqttConnection; +}; + +//Получение брокера Kafka по адресу сервиса обмена (прим. kafka://server.ru -> server.ru, https://server.ru => undefined) const getKafkaBroker = sURL => { - //Убираем лишние символы - return sURL.slice(8); + //Если протокол URL - Kafka + if (getURLProtocol(sURL) === SPROTOCOL_KAFKA) { + //Возвращаем брокера + return sURL.slice(8); + } + //Возвращаем undefined + return; }; //Получение авторизации для Kafka @@ -337,10 +377,10 @@ const getKafkaAuth = (sUser, sPass) => { return sUser ? { ssl: true, sasl: { mechanism: "plain", username: sUser, password: sPass } } : null; }; -//Получение протокола адреса +//Получение протокола адреса (прим. mqtt://server.ru -> mqtt, https://server.ru => https, ...) const getURLProtocol = sURL => { - //Считываем протокол адреса - return new URL(sURL).protocol.slice(0, -1); + //Если начинается с "/" - HTTP, иначе получаем из URL + return sURL.substring(0, 1) === "/" ? SPROTOCOL_HTTP : new URL(sURL).protocol.slice(0, -1); }; //----------------- @@ -361,6 +401,8 @@ exports.parseOptionsXML = parseOptionsXML; exports.buildOptionsXML = buildOptionsXML; exports.getNowString = getNowString; exports.deepMerge = deepMerge; +exports.getKafkaConnectionSettings = getKafkaConnectionSettings; +exports.getMQTTConnectionSettings = getMQTTConnectionSettings; exports.getKafkaBroker = getKafkaBroker; exports.getKafkaAuth = getKafkaAuth; exports.getURLProtocol = getURLProtocol; diff --git a/models/obj_service.js b/models/obj_service.js index 5d86db2..598c0fa 100644 --- a/models/obj_service.js +++ b/models/obj_service.js @@ -33,6 +33,13 @@ const NIS_AUTH_NO = 0; //Неаутентифицирован const SIS_AUTH_YES = "IS_AUTH_YES"; //Аутентифицирован (строковый код) const SIS_AUTH_NO = "IS_AUTH_NO"; //Неаутентифицирован (строковый код) +//Протоколы работы сервиса +const SPROTOCOL_HTTP = "http"; //Протокол HTTP +const SPROTOCOL_HTTPS = "https"; //Протокол HTTPS +const SPROTOCOL_MQTT = "mqtt"; //Протокол MQTT +const SPROTOCOL_MQTTS = "mqtts"; //Протокол MQTTS +const SPROTOCOL_KAFKA = "kafka"; //Протокол для работы с KAFKA + //------------- // Тело модуля //------------- @@ -59,6 +66,11 @@ exports.NIS_AUTH_YES = NIS_AUTH_YES; exports.NIS_AUTH_NO = NIS_AUTH_NO; exports.SIS_AUTH_YES = SIS_AUTH_YES; exports.SIS_AUTH_NO = SIS_AUTH_NO; +exports.SPROTOCOL_HTTP = SPROTOCOL_HTTP; +exports.SPROTOCOL_HTTPS = SPROTOCOL_HTTPS; +exports.SPROTOCOL_MQTT = SPROTOCOL_MQTT; +exports.SPROTOCOL_MQTTS = SPROTOCOL_MQTTS; +exports.SPROTOCOL_KAFKA = SPROTOCOL_KAFKA; //Схема валидации сервиса exports.Service = new Schema({ @@ -144,10 +156,8 @@ exports.Service = new Schema({ enum: [NUNAVLBL_NTF_SIGN_NO, NUNAVLBL_NTF_SIGN_YES], required: true, message: { - type: path => - `Признак необходимости оповещения о простое внешнего сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)`, - enum: path => - `Значение признака необходимости оповещения о простое внешнего сервиса (${path}) не поддерживается`, + type: path => `Признак необходимости оповещения о простое внешнего сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)`, + enum: path => `Значение признака необходимости оповещения о простое внешнего сервиса (${path}) не поддерживается`, required: path => `Не указан признак необходимости оповещения о простое внешнего сервиса (${path})` } }, @@ -159,10 +169,8 @@ exports.Service = new Schema({ message: { type: path => `Строковый код признака необходимости оповещения о простое внешнего сервиса (${path}) имеет некорректный тип данных (ожидалось - String)`, - enum: path => - `Значение строкового кода признака необходимости оповещения о простое внешнего сервиса (${path}) не поддерживается`, - required: path => - `Не указан строковый код признака необходимости оповещения о простое внешнего сервиса (${path})` + enum: path => `Значение строкового кода признака необходимости оповещения о простое внешнего сервиса (${path}) не поддерживается`, + required: path => `Не указан строковый код признака необходимости оповещения о простое внешнего сервиса (${path})` } }, //Максимальное время простоя (мин) удалённого сервиса для генерации оповещения @@ -172,8 +180,7 @@ exports.Service = new Schema({ message: { type: path => `Максимальное время простоя (мин) удалённого сервиса для генерации оповещения (${path}) имеет некорректный тип данных (ожидалось - Number)`, - required: path => - `Не указано максимальное время простоя (мин) удалённого сервиса для генерации оповещения (${path})` + required: path => `Не указано максимальное время простоя (мин) удалённого сервиса для генерации оповещения (${path})` } }, //Список адресов E-Mail для оповещения о простое внешнего сервиса @@ -194,8 +201,7 @@ exports.Service = new Schema({ type: String, required: false, message: { - type: path => - `Адрес прокси-сервера в очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, + type: path => `Адрес прокси-сервера в очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, required: path => `Не указан адрес прокси-сервера в очереди обмена (${path})` } }, @@ -237,8 +243,7 @@ exports.ServiceCtx = new Schema({ type: String, required: false, message: { - type: path => - `Строковое представление даты истечения контекста (${path}) имеет некорректный тип данных (ожидалось - String)`, + type: path => `Строковое представление даты истечения контекста (${path}) имеет некорректный тип данных (ожидалось - String)`, required: path => `Не указано строковое представление даты истечения контекста (${path})` } }, @@ -248,8 +253,7 @@ exports.ServiceCtx = new Schema({ enum: [NIS_AUTH_YES, NIS_AUTH_NO], required: true, message: { - type: path => - `Признака аутентицированности сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)`, + type: path => `Признака аутентицированности сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)`, enum: path => `Значение признака аутентицированности сервиса (${path}) не поддерживается`, required: path => `Не указан признак аутентицированности сервиса (${path})` } @@ -260,8 +264,7 @@ exports.ServiceCtx = new Schema({ enum: [SIS_AUTH_YES, SIS_AUTH_NO], required: true, message: { - type: path => - `Строковый код признака аутентицированности сервиса (${path}) имеет некорректный тип данных (ожидалось - String)`, + type: path => `Строковый код признака аутентицированности сервиса (${path}) имеет некорректный тип данных (ожидалось - String)`, enum: path => `Значение строкового кода признака аутентицированности сервиса (${path}) не поддерживается`, required: path => `Не указан строковый код признака аутентицированности сервиса (${path})` } @@ -284,8 +287,7 @@ exports.ServiceExpiredQueueInfo = new Schema({ type: Number, required: true, message: { - type: path => - `Количество просроченных сообщений обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, + type: path => `Количество просроченных сообщений обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, required: path => `Не указано количество просроченных сообщений обмена (${path})` } }, @@ -294,8 +296,7 @@ exports.ServiceExpiredQueueInfo = new Schema({ type: String, required: true, message: { - type: path => - `Информация о просроченных сообщениях обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, + type: path => `Информация о просроченных сообщениях обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, required: path => `Не указана информация о просроченных сообщениях обмена (${path})` } } diff --git a/models/obj_service_function.js b/models/obj_service_function.js index 0be959d..a8e24c3 100644 --- a/models/obj_service_function.js +++ b/models/obj_service_function.js @@ -70,13 +70,6 @@ const NERR_NTF_SIGN_YES = 1; //Оповещать об ошибке исполн const SERR_NTF_SIGN_NO = "ERR_NTF_SIGN_NO"; //Не оповещать об ошибке исполнения (строковый код) const SERR_NTF_SIGN_YES = "ERR_NTF_SIGN_YES"; //Оповещать об ошибке исполнения (строковый код) -//Протоколы работы сервиса -const SPROTOCOL_HTTP = "http"; //Протокол HTTP -const SPROTOCOL_HTTPS = "https"; //Протокол HTTPS -const SPROTOCOL_MQTT = "mqtt"; //Протокол MQTT -const SPROTOCOL_MQTTS = "mqtts"; //Протокол MQTTS -const SPROTOCOL_KAFKA = "kafka"; //Протокол для работы с KAFKA - //------------- // Тело модуля //------------- @@ -146,11 +139,6 @@ exports.NERR_NTF_SIGN_NO = NERR_NTF_SIGN_NO; exports.NERR_NTF_SIGN_YES = NERR_NTF_SIGN_YES; exports.SERR_NTF_SIGN_NO = SERR_NTF_SIGN_NO; exports.SERR_NTF_SIGN_YES = SERR_NTF_SIGN_YES; -exports.SPROTOCOL_HTTP = SPROTOCOL_HTTP; -exports.SPROTOCOL_HTTPS = SPROTOCOL_HTTPS; -exports.SPROTOCOL_MQTT = SPROTOCOL_MQTT; -exports.SPROTOCOL_MQTTS = SPROTOCOL_MQTTS; -exports.SPROTOCOL_KAFKA = SPROTOCOL_KAFKA; //Схема валидации функции сервиса exports.ServiceFunction = new Schema({