diff --git a/config.js b/config.js
index edea45f..c86ddc7 100644
--- a/config.js
+++ b/config.js
@@ -72,34 +72,42 @@ let inComing = {
};
//Параметры подключения к Kafka
-let kafkaConnection = {
- //ID клиента-отправителя
- sClientIdSender: "Parus",
- //ID клиента-получателя
- sClientIdRecipient: "Parus",
- //Группа получателя
- sGroupId: "Parus",
- //Время ожидания успешного подключения (мс)
- nConnectionTimeout: 5000,
- //Необходимость попытки переподключения при потере соединения
- bRestartOnFailure: false,
- //Время максимального ожидания между попытками переподключения (мс)
- nMaxRetryTime: 20000,
- //Время ожидания между попытками переподключения (мс)
- nInitialRetryTime: 10000
-};
+const kafka = [
+ {
+ //Мнемокод сервиса обмена (пусто - использовать по умолчанию)
+ sService: "",
+ //ID клиента-отправителя
+ sClientIdSender: "Parus",
+ //ID клиента-получателя
+ sClientIdRecipient: "Parus",
+ //Группа получателя
+ sGroupId: "Parus",
+ //Время ожидания успешного подключения (мс)
+ nConnectionTimeout: 5000,
+ //Необходимость попытки переподключения при потере соединения
+ bRestartOnFailure: true,
+ //Время максимального ожидания между попытками переподключения (мс)
+ nMaxRetryTime: 20000,
+ //Время ожидания между попытками переподключения (мс)
+ nInitialRetryTime: 10000
+ }
+];
-//Параметры подключения по MQTT протоколу
-let mqttConnection = {
- //ID клиента-отправителя
- sClientIdSender: "Parus",
- //ID клиента-получателя (если равен sClientIdSender, то отправленные сообщения будут игнорироваться)
- sClientIdRecipient: "ParusRecipient",
- //Время ожидания успешного подключения (мс)
- nConnectTimeout: 5000,
- //Время ожидания между попытками переподключения (мс)
- nReconnectPeriod: 10000
-};
+//Параметры подключения к MQTT
+const mqtt = [
+ {
+ //Мнемокод сервиса обмена (пусто - использовать по умолчанию)
+ sService: "",
+ //ID клиента-отправителя
+ sClientIdSender: "Parus",
+ //ID клиента-получателя (если равен sClientIdSender, то отправленные сообщения будут игнорироваться)
+ sClientIdRecipient: "ParusRecipient",
+ //Время ожидания успешного подключения (мс)
+ nConnectTimeout: 5000,
+ //Время ожидания между попытками переподключения (мс)
+ nReconnectPeriod: 10000
+ }
+];
//Параметры отправки E-Mail уведомлений
let mail = {
@@ -128,7 +136,7 @@ module.exports = {
dbConnect,
outGoing,
inComing,
- kafkaConnection,
- mqttConnection,
+ kafka,
+ mqtt,
mail
};
diff --git a/config_default.js b/config_default.js
index 1d5007f..c289e71 100644
--- a/config_default.js
+++ b/config_default.js
@@ -72,32 +72,42 @@ let inComing = {
};
//Параметры подключения к Kafka
-let kafkaConnection = {
- //ID клиента-отправителя
- sClientIdSender: "Parus",
- //ID клиента-получателя
- sClientIdRecipient: "Parus",
- //Время ожидания успешного подключения (мс)
- nConnectionTimeout: 5000,
- //Необходимость попытки переподключения при потере соединения
- bRestartOnFailure: false,
- //Время максимального ожидания между попытками переподключения (мс)
- nMaxRetryTime: 20000,
- //Время ожидания между попытками переподключения (мс)
- nInitialRetryTime: 10000
-};
+const kafka = [
+ {
+ //Мнемокод сервиса обмена (пусто - использовать по умолчанию)
+ sService: "",
+ //ID клиента-отправителя
+ sClientIdSender: "Parus",
+ //ID клиента-получателя
+ sClientIdRecipient: "Parus",
+ //Группа получателя
+ sGroupId: "Parus",
+ //Время ожидания успешного подключения (мс)
+ nConnectionTimeout: 5000,
+ //Необходимость попытки переподключения при потере соединения
+ bRestartOnFailure: true,
+ //Время максимального ожидания между попытками переподключения (мс)
+ nMaxRetryTime: 20000,
+ //Время ожидания между попытками переподключения (мс)
+ nInitialRetryTime: 10000
+ }
+];
-//Параметры подключения по MQTT протоколу
-let mqttConnection = {
- //ID клиента-отправителя
- sClientIdSender: "Parus",
- //ID клиента-получателя (если равен sClientIdSender, то отправленные сообщения будут игнорироваться)
- sClientIdRecipient: "ParusRecipient",
- //Время ожидания успешного подключения (мс)
- nConnectTimeout: 5000,
- //Время ожидания между попытками переподключения (мс)
- nReconnectPeriod: 10000
-};
+//Параметры подключения к MQTT
+const mqtt = [
+ {
+ //Мнемокод сервиса обмена (пусто - использовать по умолчанию)
+ sService: "",
+ //ID клиента-отправителя
+ sClientIdSender: "Parus",
+ //ID клиента-получателя (если равен sClientIdSender, то отправленные сообщения будут игнорироваться)
+ sClientIdRecipient: "ParusRecipient",
+ //Время ожидания успешного подключения (мс)
+ nConnectTimeout: 5000,
+ //Время ожидания между попытками переподключения (мс)
+ nReconnectPeriod: 10000
+ }
+];
//Параметры отправки E-Mail уведомлений
let mail = {
@@ -126,7 +136,7 @@ module.exports = {
dbConnect,
outGoing,
inComing,
- kafkaConnection,
- mqttConnection,
+ kafka,
+ mqtt,
mail
};
diff --git a/core/app.js b/core/app.js
index cf406d5..c6d096e 100644
--- a/core/app.js
+++ b/core/app.js
@@ -96,7 +96,7 @@ class ParusAppServer {
//Запускаем обслуживание очереди входящих
await this.logger.info("Запуск обработчика очереди входящих сообщений...");
try {
- this.inQ.startProcessing({ services: this.services });
+ await this.inQ.startProcessing({ services: this.services });
} catch (e) {
await this.logger.error(`Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`);
await this.stop();
@@ -218,8 +218,8 @@ class ParusAppServer {
logger: this.logger,
notifier: this.notifier,
sProxy: prms.config.outGoing.sProxy,
- kafkaConnectionPrms: prms.config.kafkaConnection,
- mqttConnectionPrms: prms.config.mqttConnection
+ kafka: prms.config.kafka,
+ mqtt: prms.config.mqtt
});
//Создаём обработчик очереди входящих
this.inQ = new iq.InQueue({
@@ -228,8 +228,8 @@ class ParusAppServer {
dbConn: this.dbConn,
logger: this.logger,
notifier: this.notifier,
- kafkaConnectionPrms: prms.config.kafkaConnection,
- mqttConnectionPrms: prms.config.mqttConnection
+ kafka: prms.config.kafka,
+ mqtt: prms.config.mqtt
});
//Создаём контроллер доступности удалённых сервисов
this.srvAvlCtrl = new sac.ServiceAvailableController({
diff --git a/core/in_queue.js b/core/in_queue.js
index 316ecc3..ce20127 100644
--- a/core/in_queue.js
+++ b/core/in_queue.js
@@ -21,7 +21,9 @@ const {
buildOptionsXML,
parseOptionsXML,
deepMerge,
- getKafkaBroker
+ getKafkaConnectionSettings,
+ getMQTTConnectionSettings,
+ getURLProtocol
} = require("./utils"); //Вспомогательные функции
const { NINC_EXEC_CNT_YES, NIS_ORIGINAL_NO, NIS_ORIGINAL_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
const objInQueueSchema = require("../models/obj_in_queue"); //Схема валидации сообщений обмена с бработчиком очереди входящих сообщений
@@ -83,9 +85,9 @@ class InQueue extends EventEmitter {
//WEB-сервер
this.srv = null;
//Параметры подключения к Kafka
- this.kafkaConnectionPrms = _.cloneDeep(prms.kafkaConnectionPrms);
+ this.kafka = prms.kafka;
//Параметры подключения к MQTT
- this.mqttConnectionPrms = _.cloneDeep(prms.mqttConnectionPrms);
+ this.mqtt = prms.mqtt;
//Внешние подключения
this.kafkaConnections = [];
this.mqttConnections = [];
@@ -697,7 +699,7 @@ class InQueue extends EventEmitter {
}
//Запуск обработки очереди входящих сообщений
- startProcessing(prms) {
+ async startProcessing(prms) {
//Проверяем структуру переданного объекта для старта
let sCheckResult = validateObject(prms, prmsInQueueSchema.startProcessing, "Параметры функции запуска обработки очереди входящих сообщений");
//Если структура объекта в норме
@@ -715,106 +717,135 @@ class InQueue extends EventEmitter {
//Конфигурируем сервер - обработка тела сообщения
this.webApp.use(bodyParser.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" }));
//Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений
- _.forEach(_.filter(this.services, { nSrvType: objServiceSchema.NSRV_TYPE_RECIVE }), srvs => {
- //Для любых запросов к корневому адресу сервиса - ответ о том, что это за сервис, и что он работает
- this.webApp.all(srvs.sSrvRoot, (req, res) => {
- res.status(200).send(
- `
Сервер приложений ПП Парус 8
(${this.common.sVersion} релиз ${this.common.sRelease})
Сервис: ${srvs.sName}
`
+ _.forEach(
+ _.filter(this.services, srv => {
+ return (
+ srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE &&
+ [objServiceSchema.SPROTOCOL_HTTP, objServiceSchema.SPROTOCOL_HTTPS].includes(getURLProtocol(srv.sSrvRoot))
);
- });
- //Для всех статических функций сервиса...
- _.forEach(
- _.filter(srvs.functions, fn => fn.sFnURL.startsWith("@")),
- fn => {
- this.webApp.use(
- buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL.substr(1) }),
- express.static(`${this.inComing.sStaticDir}/${fn.sFnURL.substr(1)}`)
+ }),
+ srvs => {
+ //Для любых запросов к корневому адресу сервиса - ответ о том, что это за сервис, и что он работает
+ this.webApp.all(srvs.sSrvRoot, (req, res) => {
+ res.status(200).send(
+ `
Сервер приложений ПП Парус 8
(${this.common.sVersion} релиз ${this.common.sRelease})
Сервис: ${srvs.sName}
`
);
- }
- );
- //Для всех функций сервиса (кроме статических)...
- _.forEach(
- _.filter(srvs.functions, fn => !fn.sFnURL.startsWith("@")),
- fn => {
- //...собственный обработчик, в зависимости от указанного способа передачи параметров
- this.webApp[fn.sFnPrmsType.toLowerCase()](buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), async (req, res) => {
- try {
- //Вызываем обработчик
- await this.processMessage({ req, res, service: srvs, function: fn });
- } catch (e) {
+ });
+ //Для всех статических функций сервиса...
+ _.forEach(
+ _.filter(srvs.functions, fn => fn.sFnURL.startsWith("@")),
+ fn => {
+ this.webApp.use(
+ buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL.substr(1) }),
+ express.static(`${this.inComing.sStaticDir}/${fn.sFnURL.substr(1)}`)
+ );
+ }
+ );
+ //Для всех функций сервиса (кроме статических)...
+ _.forEach(
+ _.filter(srvs.functions, fn => !fn.sFnURL.startsWith("@")),
+ fn => {
+ //...собственный обработчик, в зависимости от указанного способа передачи параметров
+ this.webApp[fn.sFnPrmsType.toLowerCase()](buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), async (req, res) => {
+ try {
+ //Вызываем обработчик
+ await this.processMessage({ req, res, service: srvs, function: fn });
+ } catch (e) {
+ //Протоколируем в журнал работы сервера
+ await this.logger.error(makeErrorText(e), {
+ nServiceId: srvs.nId,
+ nServiceFnId: fn.nId
+ });
+ //Отправим ошибку клиенту
+ res.status(500).send(makeErrorText(e));
+ }
+ });
+ //...и собственный обработчик ошибок
+ this.webApp.use(buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), async (err, req, res, next) => {
//Протоколируем в журнал работы сервера
- await this.logger.error(makeErrorText(e), {
+ await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)), {
nServiceId: srvs.nId,
nServiceFnId: fn.nId
});
//Отправим ошибку клиенту
- res.status(500).send(makeErrorText(e));
- }
- });
- //...и собственный обработчик ошибок
- this.webApp.use(buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), async (err, req, res, next) => {
- //Протоколируем в журнал работы сервера
- await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)), {
- nServiceId: srvs.nId,
- nServiceFnId: fn.nId
+ res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)));
});
- //Отправим ошибку клиенту
- res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)));
- });
- }
- );
- });
+ }
+ );
+ }
+ );
+ //Инициализируем настройки подключения
+ let connectionSettings = null;
//Считываем прием сообщений по Kafka
- let kafkaSrvs = _.filter(this.services, srv => {
- return srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE && srv.sSrvRoot.startsWith("kafka://");
+ let kafkaSrvs = this.services.filter(srv => {
+ return srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE && getURLProtocol(srv.sSrvRoot) === objServiceSchema.SPROTOCOL_KAFKA;
});
//Если есть сервисы с приемом сообщений по Kafka
if (kafkaSrvs.length !== 0) {
//Обходим данные сервисы
- _.forEach(kafkaSrvs, async srvs => {
+ for (let srv of kafkaSrvs) {
//Если у сервиса обмена есть функции
- if (srvs.functions.length !== 0) {
- //Подключаемся и подписываемся на соответствующий брокер
- let connectionKafka = await subscribeKafka({
- connectionPrms: this.kafkaConnectionPrms,
- service: srvs,
- processKafkaMessage: prms => this.processKafkaMessage(prms),
- logger: this.logger
- });
- //Если подключение было создано
- if (connectionKafka) {
- //Добавляем в общий список подключений kafka
- this.kafkaConnections.push(connectionKafka);
+ if (srv.functions.length !== 0) {
+ //Считываем настройки подключения к Kafka
+ connectionSettings = getKafkaConnectionSettings(srv.sCode, this.kafka);
+ //Если настройки подключения считаны
+ if (connectionSettings) {
+ //Подключаемся и подписываемся на соответствующий брокер
+ let connectionKafka = await subscribeKafka({
+ settings: connectionSettings,
+ service: srv,
+ processKafkaMessage: prms => this.processKafkaMessage(prms),
+ logger: this.logger
+ });
+ //Если подключение было создано
+ if (connectionKafka) {
+ //Добавляем в общий список подключений kafka
+ this.kafkaConnections.push(connectionKafka);
+ }
+ } else {
+ await this.logger.error(
+ `Ошибка получения настроек подключения к Kafka для сервиса "${srv.sCode}". Необходимо проверить соответствующий параметр ("kafka") файла конфигурации сервиса приложений ("config.js").`
+ );
}
}
- });
+ }
}
//Считываем прием сообщений по MQTT
- let mqttSrvs = _.filter(this.services, srv => {
+ let mqttSrvs = this.services.filter(srv => {
return (
- srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE && (srv.sSrvRoot.startsWith("mqtt://") || srv.sSrvRoot.startsWith("mqtts://"))
+ srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE &&
+ [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(getURLProtocol(srv.sSrvRoot))
);
});
//Если есть сервисы с приемом сообщений по MQTT
if (mqttSrvs.length !== 0) {
//Обходим данные сервисы
- _.forEach(mqttSrvs, async srvs => {
+ for (let srv of mqttSrvs) {
//Если у сервиса обмена есть функции
- if (srvs.functions.length !== 0) {
- //Подключаемся и подписываемся на соответствующий брокер
- let connectionMQTT = await subscribeMQTT({
- connectionPrms: this.mqttConnectionPrms,
- service: srvs,
- processMQTTMessage: prms => this.processMQTTMessage(prms),
- logger: this.logger
- });
- //Если подключение было создано
- if (connectionMQTT) {
- //Добавляем в общий список подключений kafka
- this.mqttConnections.push(connectionMQTT);
+ if (srv.functions.length !== 0) {
+ //Считываем настройки подключения к MQTT
+ connectionSettings = getMQTTConnectionSettings(srv.sCode, this.mqtt);
+ //Если настройки подключения считаны
+ if (connectionSettings) {
+ //Подключаемся и подписываемся на соответствующий брокер
+ let connectionMQTT = await subscribeMQTT({
+ settings: connectionSettings,
+ service: srv,
+ processMQTTMessage: prms => this.processMQTTMessage(prms),
+ logger: this.logger
+ });
+ //Если подключение было создано
+ if (connectionMQTT) {
+ //Добавляем в общий список подключений kafka
+ this.mqttConnections.push(connectionMQTT);
+ }
+ } else {
+ await this.logger.error(
+ `Ошибка получения настроек подключения к MQTT для сервиса "${srv.sCode}". Необходимо проверить соответствующий параметр ("mqtt") файла конфигурации сервиса приложений ("config.js").`
+ );
}
}
- });
+ }
}
//Запросы на адреса, не входящие в состав объявленных сервисов - 404 NOT FOUND
this.webApp.use("*", (req, res) => {
diff --git a/core/kafka_connector.js b/core/kafka_connector.js
index be09621..5504fa6 100644
--- a/core/kafka_connector.js
+++ b/core/kafka_connector.js
@@ -7,7 +7,6 @@
// Подключение библиотек
//----------------------
-const _ = require("lodash"); //Работа с массивами и коллекциями
const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции
const { Kafka, logLevel } = require("kafkajs"); //Работа с Kafka
@@ -16,12 +15,12 @@ const { Kafka, logLevel } = require("kafkajs"); //Работа с Kafka
//------------
//Отправка сообщения Kafka
-const publishKafka = async ({ connectionPrms, url, auth, topic, message }) => {
+const publishKafka = async ({ settings, url, auth, topic, message }) => {
//Иницализируем подключение к Kafka
let kafka = new Kafka({
- clientId: connectionPrms.sClientIdSender,
+ clientId: settings.sClientIdSender,
brokers: [url],
- connectionTimeout: connectionPrms.nConnectionTimeout,
+ connectionTimeout: settings.nConnectionTimeout,
logLevel: logLevel.NOTHING,
...auth
});
@@ -30,15 +29,15 @@ const publishKafka = async ({ connectionPrms, url, auth, topic, message }) => {
//Подключаемся к Kafka
await producer.connect();
//Отправляем сообщение
- let res = await producer.send({ topic: topic, messages: [{ value: message }] });
+ await producer.send({ topic: topic, messages: [{ value: message }] });
//Отключаемся
await producer.disconnect();
- //Возвращаем ответ
- return res;
+ //Возвращаем статус успешной отправки
+ return { statusCode: 200 };
};
//Получение MQTT сообщений
-const subscribeKafka = async ({ connectionPrms, service, processKafkaMessage, logger }) => {
+const subscribeKafka = async ({ settings, service, processKafkaMessage, logger }) => {
try {
//Признак необходимости вывода сообщения о потере соединения
let bLogLostConnection = true;
@@ -46,15 +45,15 @@ const subscribeKafka = async ({ connectionPrms, service, processKafkaMessage, lo
let sBroker = getKafkaBroker(service.sSrvRoot);
//Иницализируем подключение к Kafka
let client = new Kafka({
- clientId: connectionPrms.sClientIdRecipient,
+ clientId: settings.sClientIdRecipient,
brokers: [sBroker],
- connectionTimeout: connectionPrms.nConnectionTimeout,
+ connectionTimeout: settings.nConnectionTimeout,
...getKafkaAuth(service.sSrvUser, service.sSrvPass),
logLevel: logLevel.NOTHING,
retry: {
retries: 0,
- maxRetryTime: connectionPrms.nMaxRetryTime,
- initialRetryTime: connectionPrms.nInitialRetryTime,
+ maxRetryTime: settings.nMaxRetryTime,
+ initialRetryTime: settings.nInitialRetryTime,
restartOnFailure: error => {
return new Promise(resolve => {
//Если требуется вывести ошибку
@@ -64,19 +63,20 @@ const subscribeKafka = async ({ connectionPrms, service, processKafkaMessage, lo
//Сбрасываем признак необходимости вывода ошибки
bLogLostConnection = false;
}
- resolve(connectionPrms.bRestartOnFailure);
+ resolve(settings.bRestartOnFailure);
});
}
}
});
-
//Инициализируем получателя
- let consumer = client.consumer({ groupId: connectionPrms.sGroupId });
-
+ let consumer = client.consumer({ groupId: settings.sGroupId });
//Устанавливаем прослушивание
await consumer.connect();
- consumer.subscribe({ topics: _.map(service.functions, "sFnURL") });
-
+ consumer.subscribe({
+ topics: service.functions.map(fn => {
+ return fn.sFnURL;
+ })
+ });
//Запускаем прослушивание необходимых топиков
consumer.run({
eachMessage: async ({ topic, message }) => {
@@ -85,14 +85,15 @@ const subscribeKafka = async ({ connectionPrms, service, processKafkaMessage, lo
processKafkaMessage({
message,
service,
- fn: _.find(service.functions, { sFnURL: topic })
+ fn: service.functions.find(fn => {
+ return fn.sFnURL === topic;
+ })
});
} catch (e) {
await logger.error(`Ошибка обработки исходящего сообщения Kafka: ${makeErrorText(e)}`);
}
}
});
-
//Отслеживаем соединение
consumer.on(consumer.events.CONNECT, () => {
//Если сообщение о потере соединения уже выводилось
diff --git a/core/mqtt_connector.js b/core/mqtt_connector.js
index f63f226..17915f5 100644
--- a/core/mqtt_connector.js
+++ b/core/mqtt_connector.js
@@ -7,7 +7,6 @@
// Подключение библиотек
//----------------------
-const _ = require("lodash"); //Работа с массивами и коллекциями
const { makeErrorText } = require("./utils"); //Вспомогательные функции
const mqtt = require("mqtt"); //Работа с MQTT
@@ -16,48 +15,52 @@ const mqtt = require("mqtt"); //Работа с MQTT
//------------
//Отправка MQTT сообщения
-const publishMQTT = async ({ connectionPrms, url, auth, topic, message }) => {
+const publishMQTT = async ({ settings, url, auth, topic, message }) => {
//Инициализируем подключение
const client = await mqtt.connectAsync(url, {
- clientId: connectionPrms.sClientIdSender,
+ clientId: settings.sClientIdSender,
clean: true,
- connectTimeout: connectionPrms.nConnectTimeout,
+ connectTimeout: settings.nConnectTimeout,
username: auth.user,
password: auth.pass,
- reconnectPeriod: connectionPrms.nReconnectPeriod
+ reconnectPeriod: settings.nReconnectPeriod
});
//Отправляем сообщение
await client.publishAsync(topic, message);
//Закрываем подключение
await client.endAsync();
- //Возвращаем сообщение, которое было отправлено
+ //Возвращаем статус успешной отправки
return { statusCode: 200 };
};
//Получение MQTT сообщений
-const subscribeMQTT = async ({ connectionPrms, service, processMQTTMessage, logger }) => {
+const subscribeMQTT = async ({ settings, service, processMQTTMessage, logger }) => {
try {
//Инициализируем строку подключения
let sBroker = service.sSrvRoot;
//Инициализируем подключение
const client = await mqtt.connectAsync(sBroker, {
- clientId: connectionPrms.sClientIdRecipient,
+ clientId: settings.sClientIdRecipient,
clean: true,
- connectTimeout: connectionPrms.nConnectTimeout,
+ connectTimeout: settings.nConnectTimeout,
username: service.sSrvUser,
password: service.sSrvPass,
- reconnectPeriod: connectionPrms.nReconnectPeriod
+ reconnectPeriod: settings.nReconnectPeriod
});
-
//Обходим функции сервиса
- _.forEach(service.functions, fn => {
+ service.functions.forEach(fn => {
client.subscribe(fn.sFnURL);
});
-
//Прослушиваем сообщения
client.on("message", (topic, message) => {
//Обрабатываем сообщение
- processMQTTMessage({ message, service, fn: _.find(service.functions, { sFnURL: topic }) });
+ processMQTTMessage({
+ message,
+ service,
+ fn: service.functions.find(fn => {
+ return fn.sFnURL === topic;
+ })
+ });
});
//Прослушиваем отключение от сервера
client.on("offline", () => {
@@ -69,7 +72,6 @@ const subscribeMQTT = async ({ connectionPrms, service, processMQTTMessage, logg
//Сообщим о восстановлении соединения
logger.info(`Соединение с MQTT восстановлено (${sBroker})`);
});
-
//Возвращаем подключение
return client;
} catch (e) {
diff --git a/core/out_queue.js b/core/out_queue.js
index c15acf7..fcab507 100644
--- a/core/out_queue.js
+++ b/core/out_queue.js
@@ -71,9 +71,9 @@ class OutQueue extends EventEmitter {
//Привяжем методы к указателю на себя для использования в обработчиках событий
this.outDetectingLoop = this.outDetectingLoop.bind(this);
//Параметры подключения к Kafka
- this.kafkaConnectionPrms = _.cloneDeep(prms.kafkaConnectionPrms);
+ this.kafka = prms.kafka;
//Параметры подключения к MQTT
- this.mqttConnectionPrms = _.cloneDeep(prms.mqttConnectionPrms);
+ this.mqtt = prms.mqtt;
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
@@ -164,8 +164,8 @@ class OutQueue extends EventEmitter {
nId: prms.queue.nServiceFnId
}),
sProxy: this.sProxy,
- kafkaConnectionPrms: this.kafkaConnectionPrms,
- mqttConnectionPrms: this.mqttConnectionPrms
+ kafka: this.kafka,
+ mqtt: this.mqtt
});
//Уменьшаем количество доступных обработчиков
this.nWorkersLeft--;
diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js
index 867ac5f..1383fb1 100644
--- a/core/out_queue_processor.js
+++ b/core/out_queue_processor.js
@@ -20,6 +20,8 @@ const {
parseOptionsXML,
buildOptionsXML,
deepMerge,
+ getKafkaConnectionSettings,
+ getMQTTConnectionSettings,
getKafkaBroker,
getKafkaAuth,
getURLProtocol
@@ -31,6 +33,7 @@ const objQueueSchema = require("../models/obj_queue"); //Схемы валида
const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервиса
const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса
const {
+ SERR_UNEXPECTED,
SERR_OBJECT_BAD_INTERFACE,
SERR_APP_SERVER_BEFORE,
SERR_APP_SERVER_AFTER,
@@ -151,20 +154,38 @@ const appProcess = async prms => {
//Исходя из протокола собираем параметры
switch (true) {
//Kafka
- case sProtocol === objServiceFnSchema.SPROTOCOL_KAFKA:
+ case sProtocol === objServiceSchema.SPROTOCOL_KAFKA:
options.url = getKafkaBroker(prms.service.sSrvRoot);
options.body = prms.queue.blMsg;
options.topic = prms.function.sFnURL;
options.auth = getKafkaAuth(prms.service.sSrvUser, prms.service.sSrvPass);
+ options.settings = getKafkaConnectionSettings(prms.service.sCode, prms.kafka);
+ //Если параметры подключения не считаны
+ if (!options.settings) {
+ //Расскажем об ошибке считывания
+ throw new ServerError(
+ SERR_UNEXPECTED,
+ `Ошибка получения настроек подключения к Kafka для сервиса "${prms.service.sCode}". Необходимо проверить соответствующий параметр ("kafka") файла конфигурации сервиса приложений ("config.js").`
+ );
+ }
//Указываем, что выполнение обработчика "После" невозможно
bExecuteAfter = false;
break;
//mqtt и mqtts
- case [objServiceFnSchema.SPROTOCOL_MQTT, objServiceFnSchema.SPROTOCOL_MQTTS].includes(sProtocol):
+ case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol):
options.url = prms.service.sSrvRoot;
options.body = prms.queue.blMsg;
options.topic = prms.function.sFnURL;
options.auth = { user: prms.service.sSrvUser, pass: prms.service.sSrvPass };
+ options.settings = getMQTTConnectionSettings(prms.service.sCode, prms.mqtt);
+ //Если параметры подключения не считаны
+ if (!options.settings) {
+ //Расскажем об ошибке считывания
+ throw new ServerError(
+ SERR_UNEXPECTED,
+ `Ошибка получения настроек подключения к MQTT для сервиса "${prms.service.sCode}". Необходимо проверить соответствующий параметр ("mqtt") файла конфигурации сервиса приложений ("config.js").`
+ );
+ }
//Указываем, что выполнение обработчика "После" невозможно
bExecuteAfter = false;
break;
@@ -245,11 +266,9 @@ const appProcess = async prms => {
objServiceFnSchema.NFN_PRMS_TYPE_PATCH,
objServiceFnSchema.NFN_PRMS_TYPE_PUT
].includes(prms.function.nFnPrmsType) ||
- [
- objServiceFnSchema.SPROTOCOL_KAFKA,
- objServiceFnSchema.SPROTOCOL_MQTT,
- objServiceFnSchema.SPROTOCOL_MQTTS
- ].includes(sProtocol)
+ [objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(
+ sProtocol
+ )
) {
options.body = prms.queue.blMsg;
} else {
@@ -311,25 +330,25 @@ const appProcess = async prms => {
nQueueId: prms.queue.nId
});
}
- //Ждем ответ от удалённого сервера
- options.resolveWithFullResponse = true;
+ //Инициализируем ответ от сервера
let serverResp = null;
//Выполняем отправку исходя из протокола
switch (true) {
//Kafka
- case sProtocol === objServiceFnSchema.SPROTOCOL_KAFKA:
+ case sProtocol === objServiceSchema.SPROTOCOL_KAFKA:
serverResp = await publishKafka({
- connectionPrms: prms.kafkaConnectionPrms,
+ settings: options.settings,
url: options.url,
auth: options.auth,
topic: options.topic,
message: options.body
});
+ console.log(serverResp);
break;
//mqtt и mqtts
- case [objServiceFnSchema.SPROTOCOL_MQTT, objServiceFnSchema.SPROTOCOL_MQTTS].includes(sProtocol):
+ case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol):
serverResp = await publishMQTT({
- connectionPrms: prms.mqttConnectionPrms,
+ settings: options.settings,
url: options.url,
auth: options.auth,
topic: options.topic,
@@ -338,6 +357,9 @@ const appProcess = async prms => {
break;
//Другие
default:
+ //Ждем ответ от удалённого сервера
+ options.resolveWithFullResponse = true;
+ //Отправляем запрос
serverResp = await rqp(options);
break;
}
@@ -593,8 +615,8 @@ const processTask = async prms => {
service: prms.task.service,
function: prms.task.function,
sProxy: prms.task.sProxy,
- kafkaConnectionPrms: prms.task.kafkaConnectionPrms,
- mqttConnectionPrms: prms.task.mqttConnectionPrms
+ kafka: prms.task.kafka,
+ mqtt: prms.task.mqtt
});
//Если результат обработки ошибка - пробрасываем её дальше
if (res instanceof ServerError) {
diff --git a/core/utils.js b/core/utils.js
index 011023f..e4973d4 100644
--- a/core/utils.js
+++ b/core/utils.js
@@ -22,6 +22,7 @@ const {
} = require("./constants"); //Глобавльные константы системы
const { ServerError } = require("./server_errors"); //Ошибка сервера
const prmsUtilsSchema = require("../models/prms_utils"); //Схемы валидации параметров функций
+const { SPROTOCOL_HTTP, SPROTOCOL_KAFKA } = require("../models/obj_service"); //Схемы валидации сервиса
//------------
// Тело модуля
@@ -326,10 +327,49 @@ const deepMerge = (...args) => {
return res;
};
-//Получение брокера Kafka по адресу сервиса обмена
+//Считывание параметров подключения для сервиса обмена (при service === "" считывание подключения "По умолчанию", settingsArray - массив объектов [{sService: "", ...},...])
+const getConnectionSettings = (service, settingsArray) => {
+ //Считываем параметры и возвращаем
+ return settingsArray.find(connection => {
+ return connection.sService === service;
+ });
+};
+
+//Считывание параметров подключения к Kafka для сервиса обмена (kafka - массив объектов [{sService: "", ...},...])
+const getKafkaConnectionSettings = (service, kafka) => {
+ //Считываем подключение с указанным сервисом обмена
+ let kafkaConnection = getConnectionSettings(service, kafka);
+ //Если нет подключения с указанным сервисом обмена
+ if (!kafkaConnection) {
+ //Считываем "По умолчанию"
+ kafkaConnection = getConnectionSettings("", kafka);
+ }
+ //Вернем результат
+ return kafkaConnection;
+};
+
+//Считывание параметров подключения к MQTT для сервиса обмена (mqtt - массив объектов [{sService: "", ...},...])
+const getMQTTConnectionSettings = (service, mqtt) => {
+ //Считываем подключение с указанным сервисом обмена
+ let mqttConnection = getConnectionSettings(service, mqtt);
+ //Если нет подключения с указанным сервисом обмена
+ if (!mqttConnection) {
+ //Считываем "По умолчанию"
+ mqttConnection = getConnectionSettings("", mqtt);
+ }
+ //Вернем результат
+ return mqttConnection;
+};
+
+//Получение брокера Kafka по адресу сервиса обмена (прим. kafka://server.ru -> server.ru, https://server.ru => undefined)
const getKafkaBroker = sURL => {
- //Убираем лишние символы
- return sURL.slice(8);
+ //Если протокол URL - Kafka
+ if (getURLProtocol(sURL) === SPROTOCOL_KAFKA) {
+ //Возвращаем брокера
+ return sURL.slice(8);
+ }
+ //Возвращаем undefined
+ return;
};
//Получение авторизации для Kafka
@@ -337,10 +377,10 @@ const getKafkaAuth = (sUser, sPass) => {
return sUser ? { ssl: true, sasl: { mechanism: "plain", username: sUser, password: sPass } } : null;
};
-//Получение протокола адреса
+//Получение протокола адреса (прим. mqtt://server.ru -> mqtt, https://server.ru => https, ...)
const getURLProtocol = sURL => {
- //Считываем протокол адреса
- return new URL(sURL).protocol.slice(0, -1);
+ //Если начинается с "/" - HTTP, иначе получаем из URL
+ return sURL.substring(0, 1) === "/" ? SPROTOCOL_HTTP : new URL(sURL).protocol.slice(0, -1);
};
//-----------------
@@ -361,6 +401,8 @@ exports.parseOptionsXML = parseOptionsXML;
exports.buildOptionsXML = buildOptionsXML;
exports.getNowString = getNowString;
exports.deepMerge = deepMerge;
+exports.getKafkaConnectionSettings = getKafkaConnectionSettings;
+exports.getMQTTConnectionSettings = getMQTTConnectionSettings;
exports.getKafkaBroker = getKafkaBroker;
exports.getKafkaAuth = getKafkaAuth;
exports.getURLProtocol = getURLProtocol;
diff --git a/models/obj_service.js b/models/obj_service.js
index 5d86db2..598c0fa 100644
--- a/models/obj_service.js
+++ b/models/obj_service.js
@@ -33,6 +33,13 @@ const NIS_AUTH_NO = 0; //Неаутентифицирован
const SIS_AUTH_YES = "IS_AUTH_YES"; //Аутентифицирован (строковый код)
const SIS_AUTH_NO = "IS_AUTH_NO"; //Неаутентифицирован (строковый код)
+//Протоколы работы сервиса
+const SPROTOCOL_HTTP = "http"; //Протокол HTTP
+const SPROTOCOL_HTTPS = "https"; //Протокол HTTPS
+const SPROTOCOL_MQTT = "mqtt"; //Протокол MQTT
+const SPROTOCOL_MQTTS = "mqtts"; //Протокол MQTTS
+const SPROTOCOL_KAFKA = "kafka"; //Протокол для работы с KAFKA
+
//-------------
// Тело модуля
//-------------
@@ -59,6 +66,11 @@ exports.NIS_AUTH_YES = NIS_AUTH_YES;
exports.NIS_AUTH_NO = NIS_AUTH_NO;
exports.SIS_AUTH_YES = SIS_AUTH_YES;
exports.SIS_AUTH_NO = SIS_AUTH_NO;
+exports.SPROTOCOL_HTTP = SPROTOCOL_HTTP;
+exports.SPROTOCOL_HTTPS = SPROTOCOL_HTTPS;
+exports.SPROTOCOL_MQTT = SPROTOCOL_MQTT;
+exports.SPROTOCOL_MQTTS = SPROTOCOL_MQTTS;
+exports.SPROTOCOL_KAFKA = SPROTOCOL_KAFKA;
//Схема валидации сервиса
exports.Service = new Schema({
@@ -144,10 +156,8 @@ exports.Service = new Schema({
enum: [NUNAVLBL_NTF_SIGN_NO, NUNAVLBL_NTF_SIGN_YES],
required: true,
message: {
- type: path =>
- `Признак необходимости оповещения о простое внешнего сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)`,
- enum: path =>
- `Значение признака необходимости оповещения о простое внешнего сервиса (${path}) не поддерживается`,
+ type: path => `Признак необходимости оповещения о простое внешнего сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)`,
+ enum: path => `Значение признака необходимости оповещения о простое внешнего сервиса (${path}) не поддерживается`,
required: path => `Не указан признак необходимости оповещения о простое внешнего сервиса (${path})`
}
},
@@ -159,10 +169,8 @@ exports.Service = new Schema({
message: {
type: path =>
`Строковый код признака необходимости оповещения о простое внешнего сервиса (${path}) имеет некорректный тип данных (ожидалось - String)`,
- enum: path =>
- `Значение строкового кода признака необходимости оповещения о простое внешнего сервиса (${path}) не поддерживается`,
- required: path =>
- `Не указан строковый код признака необходимости оповещения о простое внешнего сервиса (${path})`
+ enum: path => `Значение строкового кода признака необходимости оповещения о простое внешнего сервиса (${path}) не поддерживается`,
+ required: path => `Не указан строковый код признака необходимости оповещения о простое внешнего сервиса (${path})`
}
},
//Максимальное время простоя (мин) удалённого сервиса для генерации оповещения
@@ -172,8 +180,7 @@ exports.Service = new Schema({
message: {
type: path =>
`Максимальное время простоя (мин) удалённого сервиса для генерации оповещения (${path}) имеет некорректный тип данных (ожидалось - Number)`,
- required: path =>
- `Не указано максимальное время простоя (мин) удалённого сервиса для генерации оповещения (${path})`
+ required: path => `Не указано максимальное время простоя (мин) удалённого сервиса для генерации оповещения (${path})`
}
},
//Список адресов E-Mail для оповещения о простое внешнего сервиса
@@ -194,8 +201,7 @@ exports.Service = new Schema({
type: String,
required: false,
message: {
- type: path =>
- `Адрес прокси-сервера в очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
+ type: path => `Адрес прокси-сервера в очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан адрес прокси-сервера в очереди обмена (${path})`
}
},
@@ -237,8 +243,7 @@ exports.ServiceCtx = new Schema({
type: String,
required: false,
message: {
- type: path =>
- `Строковое представление даты истечения контекста (${path}) имеет некорректный тип данных (ожидалось - String)`,
+ type: path => `Строковое представление даты истечения контекста (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указано строковое представление даты истечения контекста (${path})`
}
},
@@ -248,8 +253,7 @@ exports.ServiceCtx = new Schema({
enum: [NIS_AUTH_YES, NIS_AUTH_NO],
required: true,
message: {
- type: path =>
- `Признака аутентицированности сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)`,
+ type: path => `Признака аутентицированности сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)`,
enum: path => `Значение признака аутентицированности сервиса (${path}) не поддерживается`,
required: path => `Не указан признак аутентицированности сервиса (${path})`
}
@@ -260,8 +264,7 @@ exports.ServiceCtx = new Schema({
enum: [SIS_AUTH_YES, SIS_AUTH_NO],
required: true,
message: {
- type: path =>
- `Строковый код признака аутентицированности сервиса (${path}) имеет некорректный тип данных (ожидалось - String)`,
+ type: path => `Строковый код признака аутентицированности сервиса (${path}) имеет некорректный тип данных (ожидалось - String)`,
enum: path => `Значение строкового кода признака аутентицированности сервиса (${path}) не поддерживается`,
required: path => `Не указан строковый код признака аутентицированности сервиса (${path})`
}
@@ -284,8 +287,7 @@ exports.ServiceExpiredQueueInfo = new Schema({
type: Number,
required: true,
message: {
- type: path =>
- `Количество просроченных сообщений обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
+ type: path => `Количество просроченных сообщений обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указано количество просроченных сообщений обмена (${path})`
}
},
@@ -294,8 +296,7 @@ exports.ServiceExpiredQueueInfo = new Schema({
type: String,
required: true,
message: {
- type: path =>
- `Информация о просроченных сообщениях обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
+ type: path => `Информация о просроченных сообщениях обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указана информация о просроченных сообщениях обмена (${path})`
}
}
diff --git a/models/obj_service_function.js b/models/obj_service_function.js
index 0be959d..a8e24c3 100644
--- a/models/obj_service_function.js
+++ b/models/obj_service_function.js
@@ -70,13 +70,6 @@ const NERR_NTF_SIGN_YES = 1; //Оповещать об ошибке исполн
const SERR_NTF_SIGN_NO = "ERR_NTF_SIGN_NO"; //Не оповещать об ошибке исполнения (строковый код)
const SERR_NTF_SIGN_YES = "ERR_NTF_SIGN_YES"; //Оповещать об ошибке исполнения (строковый код)
-//Протоколы работы сервиса
-const SPROTOCOL_HTTP = "http"; //Протокол HTTP
-const SPROTOCOL_HTTPS = "https"; //Протокол HTTPS
-const SPROTOCOL_MQTT = "mqtt"; //Протокол MQTT
-const SPROTOCOL_MQTTS = "mqtts"; //Протокол MQTTS
-const SPROTOCOL_KAFKA = "kafka"; //Протокол для работы с KAFKA
-
//-------------
// Тело модуля
//-------------
@@ -146,11 +139,6 @@ exports.NERR_NTF_SIGN_NO = NERR_NTF_SIGN_NO;
exports.NERR_NTF_SIGN_YES = NERR_NTF_SIGN_YES;
exports.SERR_NTF_SIGN_NO = SERR_NTF_SIGN_NO;
exports.SERR_NTF_SIGN_YES = SERR_NTF_SIGN_YES;
-exports.SPROTOCOL_HTTP = SPROTOCOL_HTTP;
-exports.SPROTOCOL_HTTPS = SPROTOCOL_HTTPS;
-exports.SPROTOCOL_MQTT = SPROTOCOL_MQTT;
-exports.SPROTOCOL_MQTTS = SPROTOCOL_MQTTS;
-exports.SPROTOCOL_KAFKA = SPROTOCOL_KAFKA;
//Схема валидации функции сервиса
exports.ServiceFunction = new Schema({