ЦИТК-901 (Добавление поддержки протоколов MQTT и KAFKA)

This commit is contained in:
Dollerino 2024-09-24 17:58:57 +03:00
parent 226fe582b6
commit 6ad825c2b6
11 changed files with 337 additions and 232 deletions

View File

@ -72,7 +72,10 @@ let inComing = {
}; };
//Параметры подключения к Kafka //Параметры подключения к Kafka
let kafkaConnection = { const kafka = [
{
//Мнемокод сервиса обмена (пусто - использовать по умолчанию)
sService: "",
//ID клиента-отправителя //ID клиента-отправителя
sClientIdSender: "Parus", sClientIdSender: "Parus",
//ID клиента-получателя //ID клиента-получателя
@ -82,15 +85,19 @@ let kafkaConnection = {
//Время ожидания успешного подключения (мс) //Время ожидания успешного подключения (мс)
nConnectionTimeout: 5000, nConnectionTimeout: 5000,
//Необходимость попытки переподключения при потере соединения //Необходимость попытки переподключения при потере соединения
bRestartOnFailure: false, bRestartOnFailure: true,
//Время максимального ожидания между попытками переподключения (мс) //Время максимального ожидания между попытками переподключения (мс)
nMaxRetryTime: 20000, nMaxRetryTime: 20000,
//Время ожидания между попытками переподключения (мс) //Время ожидания между попытками переподключения (мс)
nInitialRetryTime: 10000 nInitialRetryTime: 10000
}; }
];
//Параметры подключения по MQTT протоколу //Параметры подключения к MQTT
let mqttConnection = { const mqtt = [
{
//Мнемокод сервиса обмена (пусто - использовать по умолчанию)
sService: "",
//ID клиента-отправителя //ID клиента-отправителя
sClientIdSender: "Parus", sClientIdSender: "Parus",
//ID клиента-получателя (если равен sClientIdSender, то отправленные сообщения будут игнорироваться) //ID клиента-получателя (если равен sClientIdSender, то отправленные сообщения будут игнорироваться)
@ -99,7 +106,8 @@ let mqttConnection = {
nConnectTimeout: 5000, nConnectTimeout: 5000,
//Время ожидания между попытками переподключения (мс) //Время ожидания между попытками переподключения (мс)
nReconnectPeriod: 10000 nReconnectPeriod: 10000
}; }
];
//Параметры отправки E-Mail уведомлений //Параметры отправки E-Mail уведомлений
let mail = { let mail = {
@ -128,7 +136,7 @@ module.exports = {
dbConnect, dbConnect,
outGoing, outGoing,
inComing, inComing,
kafkaConnection, kafka,
mqttConnection, mqtt,
mail mail
}; };

View File

@ -72,23 +72,32 @@ let inComing = {
}; };
//Параметры подключения к Kafka //Параметры подключения к Kafka
let kafkaConnection = { const kafka = [
{
//Мнемокод сервиса обмена (пусто - использовать по умолчанию)
sService: "",
//ID клиента-отправителя //ID клиента-отправителя
sClientIdSender: "Parus", sClientIdSender: "Parus",
//ID клиента-получателя //ID клиента-получателя
sClientIdRecipient: "Parus", sClientIdRecipient: "Parus",
//Группа получателя
sGroupId: "Parus",
//Время ожидания успешного подключения (мс) //Время ожидания успешного подключения (мс)
nConnectionTimeout: 5000, nConnectionTimeout: 5000,
//Необходимость попытки переподключения при потере соединения //Необходимость попытки переподключения при потере соединения
bRestartOnFailure: false, bRestartOnFailure: true,
//Время максимального ожидания между попытками переподключения (мс) //Время максимального ожидания между попытками переподключения (мс)
nMaxRetryTime: 20000, nMaxRetryTime: 20000,
//Время ожидания между попытками переподключения (мс) //Время ожидания между попытками переподключения (мс)
nInitialRetryTime: 10000 nInitialRetryTime: 10000
}; }
];
//Параметры подключения по MQTT протоколу //Параметры подключения к MQTT
let mqttConnection = { const mqtt = [
{
//Мнемокод сервиса обмена (пусто - использовать по умолчанию)
sService: "",
//ID клиента-отправителя //ID клиента-отправителя
sClientIdSender: "Parus", sClientIdSender: "Parus",
//ID клиента-получателя (если равен sClientIdSender, то отправленные сообщения будут игнорироваться) //ID клиента-получателя (если равен sClientIdSender, то отправленные сообщения будут игнорироваться)
@ -97,7 +106,8 @@ let mqttConnection = {
nConnectTimeout: 5000, nConnectTimeout: 5000,
//Время ожидания между попытками переподключения (мс) //Время ожидания между попытками переподключения (мс)
nReconnectPeriod: 10000 nReconnectPeriod: 10000
}; }
];
//Параметры отправки E-Mail уведомлений //Параметры отправки E-Mail уведомлений
let mail = { let mail = {
@ -126,7 +136,7 @@ module.exports = {
dbConnect, dbConnect,
outGoing, outGoing,
inComing, inComing,
kafkaConnection, kafka,
mqttConnection, mqtt,
mail mail
}; };

View File

@ -96,7 +96,7 @@ class ParusAppServer {
//Запускаем обслуживание очереди входящих //Запускаем обслуживание очереди входящих
await this.logger.info("Запуск обработчика очереди входящих сообщений..."); await this.logger.info("Запуск обработчика очереди входящих сообщений...");
try { try {
this.inQ.startProcessing({ services: this.services }); await this.inQ.startProcessing({ services: this.services });
} catch (e) { } catch (e) {
await this.logger.error(`Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`); await this.logger.error(`Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`);
await this.stop(); await this.stop();
@ -218,8 +218,8 @@ class ParusAppServer {
logger: this.logger, logger: this.logger,
notifier: this.notifier, notifier: this.notifier,
sProxy: prms.config.outGoing.sProxy, sProxy: prms.config.outGoing.sProxy,
kafkaConnectionPrms: prms.config.kafkaConnection, kafka: prms.config.kafka,
mqttConnectionPrms: prms.config.mqttConnection mqtt: prms.config.mqtt
}); });
//Создаём обработчик очереди входящих //Создаём обработчик очереди входящих
this.inQ = new iq.InQueue({ this.inQ = new iq.InQueue({
@ -228,8 +228,8 @@ class ParusAppServer {
dbConn: this.dbConn, dbConn: this.dbConn,
logger: this.logger, logger: this.logger,
notifier: this.notifier, notifier: this.notifier,
kafkaConnectionPrms: prms.config.kafkaConnection, kafka: prms.config.kafka,
mqttConnectionPrms: prms.config.mqttConnection mqtt: prms.config.mqtt
}); });
//Создаём контроллер доступности удалённых сервисов //Создаём контроллер доступности удалённых сервисов
this.srvAvlCtrl = new sac.ServiceAvailableController({ this.srvAvlCtrl = new sac.ServiceAvailableController({

View File

@ -21,7 +21,9 @@ const {
buildOptionsXML, buildOptionsXML,
parseOptionsXML, parseOptionsXML,
deepMerge, deepMerge,
getKafkaBroker getKafkaConnectionSettings,
getMQTTConnectionSettings,
getURLProtocol
} = require("./utils"); //Вспомогательные функции } = require("./utils"); //Вспомогательные функции
const { NINC_EXEC_CNT_YES, NIS_ORIGINAL_NO, NIS_ORIGINAL_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД const { NINC_EXEC_CNT_YES, NIS_ORIGINAL_NO, NIS_ORIGINAL_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
const objInQueueSchema = require("../models/obj_in_queue"); //Схема валидации сообщений обмена с бработчиком очереди входящих сообщений const objInQueueSchema = require("../models/obj_in_queue"); //Схема валидации сообщений обмена с бработчиком очереди входящих сообщений
@ -83,9 +85,9 @@ class InQueue extends EventEmitter {
//WEB-сервер //WEB-сервер
this.srv = null; this.srv = null;
//Параметры подключения к Kafka //Параметры подключения к Kafka
this.kafkaConnectionPrms = _.cloneDeep(prms.kafkaConnectionPrms); this.kafka = prms.kafka;
//Параметры подключения к MQTT //Параметры подключения к MQTT
this.mqttConnectionPrms = _.cloneDeep(prms.mqttConnectionPrms); this.mqtt = prms.mqtt;
//Внешние подключения //Внешние подключения
this.kafkaConnections = []; this.kafkaConnections = [];
this.mqttConnections = []; this.mqttConnections = [];
@ -697,7 +699,7 @@ class InQueue extends EventEmitter {
} }
//Запуск обработки очереди входящих сообщений //Запуск обработки очереди входящих сообщений
startProcessing(prms) { async startProcessing(prms) {
//Проверяем структуру переданного объекта для старта //Проверяем структуру переданного объекта для старта
let sCheckResult = validateObject(prms, prmsInQueueSchema.startProcessing, "Параметры функции запуска обработки очереди входящих сообщений"); let sCheckResult = validateObject(prms, prmsInQueueSchema.startProcessing, "Параметры функции запуска обработки очереди входящих сообщений");
//Если структура объекта в норме //Если структура объекта в норме
@ -715,7 +717,14 @@ class InQueue extends EventEmitter {
//Конфигурируем сервер - обработка тела сообщения //Конфигурируем сервер - обработка тела сообщения
this.webApp.use(bodyParser.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" })); this.webApp.use(bodyParser.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" }));
//Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений //Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений
_.forEach(_.filter(this.services, { nSrvType: objServiceSchema.NSRV_TYPE_RECIVE }), srvs => { _.forEach(
_.filter(this.services, srv => {
return (
srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE &&
[objServiceSchema.SPROTOCOL_HTTP, objServiceSchema.SPROTOCOL_HTTPS].includes(getURLProtocol(srv.sSrvRoot))
);
}),
srvs => {
//Для любых запросов к корневому адресу сервиса - ответ о том, что это за сервис, и что он работает //Для любых запросов к корневому адресу сервиса - ответ о том, что это за сервис, и что он работает
this.webApp.all(srvs.sSrvRoot, (req, res) => { this.webApp.all(srvs.sSrvRoot, (req, res) => {
res.status(200).send( res.status(200).send(
@ -763,21 +772,28 @@ class InQueue extends EventEmitter {
}); });
} }
); );
}); }
);
//Инициализируем настройки подключения
let connectionSettings = null;
//Считываем прием сообщений по Kafka //Считываем прием сообщений по Kafka
let kafkaSrvs = _.filter(this.services, srv => { let kafkaSrvs = this.services.filter(srv => {
return srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE && srv.sSrvRoot.startsWith("kafka://"); return srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE && getURLProtocol(srv.sSrvRoot) === objServiceSchema.SPROTOCOL_KAFKA;
}); });
//Если есть сервисы с приемом сообщений по Kafka //Если есть сервисы с приемом сообщений по Kafka
if (kafkaSrvs.length !== 0) { if (kafkaSrvs.length !== 0) {
//Обходим данные сервисы //Обходим данные сервисы
_.forEach(kafkaSrvs, async srvs => { for (let srv of kafkaSrvs) {
//Если у сервиса обмена есть функции //Если у сервиса обмена есть функции
if (srvs.functions.length !== 0) { if (srv.functions.length !== 0) {
//Считываем настройки подключения к Kafka
connectionSettings = getKafkaConnectionSettings(srv.sCode, this.kafka);
//Если настройки подключения считаны
if (connectionSettings) {
//Подключаемся и подписываемся на соответствующий брокер //Подключаемся и подписываемся на соответствующий брокер
let connectionKafka = await subscribeKafka({ let connectionKafka = await subscribeKafka({
connectionPrms: this.kafkaConnectionPrms, settings: connectionSettings,
service: srvs, service: srv,
processKafkaMessage: prms => this.processKafkaMessage(prms), processKafkaMessage: prms => this.processKafkaMessage(prms),
logger: this.logger logger: this.logger
}); });
@ -786,25 +802,35 @@ class InQueue extends EventEmitter {
//Добавляем в общий список подключений kafka //Добавляем в общий список подключений kafka
this.kafkaConnections.push(connectionKafka); this.kafkaConnections.push(connectionKafka);
} }
} else {
await this.logger.error(
`Ошибка получения настроек подключения к Kafka для сервиса "${srv.sCode}". Необходимо проверить соответствующий параметр ("kafka") файла конфигурации сервиса приложений ("config.js").`
);
}
}
} }
});
} }
//Считываем прием сообщений по MQTT //Считываем прием сообщений по MQTT
let mqttSrvs = _.filter(this.services, srv => { let mqttSrvs = this.services.filter(srv => {
return ( return (
srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE && (srv.sSrvRoot.startsWith("mqtt://") || srv.sSrvRoot.startsWith("mqtts://")) srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE &&
[objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(getURLProtocol(srv.sSrvRoot))
); );
}); });
//Если есть сервисы с приемом сообщений по MQTT //Если есть сервисы с приемом сообщений по MQTT
if (mqttSrvs.length !== 0) { if (mqttSrvs.length !== 0) {
//Обходим данные сервисы //Обходим данные сервисы
_.forEach(mqttSrvs, async srvs => { for (let srv of mqttSrvs) {
//Если у сервиса обмена есть функции //Если у сервиса обмена есть функции
if (srvs.functions.length !== 0) { if (srv.functions.length !== 0) {
//Считываем настройки подключения к MQTT
connectionSettings = getMQTTConnectionSettings(srv.sCode, this.mqtt);
//Если настройки подключения считаны
if (connectionSettings) {
//Подключаемся и подписываемся на соответствующий брокер //Подключаемся и подписываемся на соответствующий брокер
let connectionMQTT = await subscribeMQTT({ let connectionMQTT = await subscribeMQTT({
connectionPrms: this.mqttConnectionPrms, settings: connectionSettings,
service: srvs, service: srv,
processMQTTMessage: prms => this.processMQTTMessage(prms), processMQTTMessage: prms => this.processMQTTMessage(prms),
logger: this.logger logger: this.logger
}); });
@ -813,8 +839,13 @@ class InQueue extends EventEmitter {
//Добавляем в общий список подключений kafka //Добавляем в общий список подключений kafka
this.mqttConnections.push(connectionMQTT); this.mqttConnections.push(connectionMQTT);
} }
} else {
await this.logger.error(
`Ошибка получения настроек подключения к MQTT для сервиса "${srv.sCode}". Необходимо проверить соответствующий параметр ("mqtt") файла конфигурации сервиса приложений ("config.js").`
);
}
}
} }
});
} }
//Запросы на адреса, не входящие в состав объявленных сервисов - 404 NOT FOUND //Запросы на адреса, не входящие в состав объявленных сервисов - 404 NOT FOUND
this.webApp.use("*", (req, res) => { this.webApp.use("*", (req, res) => {

View File

@ -7,7 +7,6 @@
// Подключение библиотек // Подключение библиотек
//---------------------- //----------------------
const _ = require("lodash"); //Работа с массивами и коллекциями
const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции
const { Kafka, logLevel } = require("kafkajs"); //Работа с Kafka const { Kafka, logLevel } = require("kafkajs"); //Работа с Kafka
@ -16,12 +15,12 @@ const { Kafka, logLevel } = require("kafkajs"); //Работа с Kafka
//------------ //------------
//Отправка сообщения Kafka //Отправка сообщения Kafka
const publishKafka = async ({ connectionPrms, url, auth, topic, message }) => { const publishKafka = async ({ settings, url, auth, topic, message }) => {
//Иницализируем подключение к Kafka //Иницализируем подключение к Kafka
let kafka = new Kafka({ let kafka = new Kafka({
clientId: connectionPrms.sClientIdSender, clientId: settings.sClientIdSender,
brokers: [url], brokers: [url],
connectionTimeout: connectionPrms.nConnectionTimeout, connectionTimeout: settings.nConnectionTimeout,
logLevel: logLevel.NOTHING, logLevel: logLevel.NOTHING,
...auth ...auth
}); });
@ -30,15 +29,15 @@ const publishKafka = async ({ connectionPrms, url, auth, topic, message }) => {
//Подключаемся к Kafka //Подключаемся к Kafka
await producer.connect(); await producer.connect();
//Отправляем сообщение //Отправляем сообщение
let res = await producer.send({ topic: topic, messages: [{ value: message }] }); await producer.send({ topic: topic, messages: [{ value: message }] });
//Отключаемся //Отключаемся
await producer.disconnect(); await producer.disconnect();
//Возвращаем ответ //Возвращаем статус успешной отправки
return res; return { statusCode: 200 };
}; };
//Получение MQTT сообщений //Получение MQTT сообщений
const subscribeKafka = async ({ connectionPrms, service, processKafkaMessage, logger }) => { const subscribeKafka = async ({ settings, service, processKafkaMessage, logger }) => {
try { try {
//Признак необходимости вывода сообщения о потере соединения //Признак необходимости вывода сообщения о потере соединения
let bLogLostConnection = true; let bLogLostConnection = true;
@ -46,15 +45,15 @@ const subscribeKafka = async ({ connectionPrms, service, processKafkaMessage, lo
let sBroker = getKafkaBroker(service.sSrvRoot); let sBroker = getKafkaBroker(service.sSrvRoot);
//Иницализируем подключение к Kafka //Иницализируем подключение к Kafka
let client = new Kafka({ let client = new Kafka({
clientId: connectionPrms.sClientIdRecipient, clientId: settings.sClientIdRecipient,
brokers: [sBroker], brokers: [sBroker],
connectionTimeout: connectionPrms.nConnectionTimeout, connectionTimeout: settings.nConnectionTimeout,
...getKafkaAuth(service.sSrvUser, service.sSrvPass), ...getKafkaAuth(service.sSrvUser, service.sSrvPass),
logLevel: logLevel.NOTHING, logLevel: logLevel.NOTHING,
retry: { retry: {
retries: 0, retries: 0,
maxRetryTime: connectionPrms.nMaxRetryTime, maxRetryTime: settings.nMaxRetryTime,
initialRetryTime: connectionPrms.nInitialRetryTime, initialRetryTime: settings.nInitialRetryTime,
restartOnFailure: error => { restartOnFailure: error => {
return new Promise(resolve => { return new Promise(resolve => {
//Если требуется вывести ошибку //Если требуется вывести ошибку
@ -64,19 +63,20 @@ const subscribeKafka = async ({ connectionPrms, service, processKafkaMessage, lo
//Сбрасываем признак необходимости вывода ошибки //Сбрасываем признак необходимости вывода ошибки
bLogLostConnection = false; bLogLostConnection = false;
} }
resolve(connectionPrms.bRestartOnFailure); resolve(settings.bRestartOnFailure);
}); });
} }
} }
}); });
//Инициализируем получателя //Инициализируем получателя
let consumer = client.consumer({ groupId: connectionPrms.sGroupId }); let consumer = client.consumer({ groupId: settings.sGroupId });
//Устанавливаем прослушивание //Устанавливаем прослушивание
await consumer.connect(); await consumer.connect();
consumer.subscribe({ topics: _.map(service.functions, "sFnURL") }); consumer.subscribe({
topics: service.functions.map(fn => {
return fn.sFnURL;
})
});
//Запускаем прослушивание необходимых топиков //Запускаем прослушивание необходимых топиков
consumer.run({ consumer.run({
eachMessage: async ({ topic, message }) => { eachMessage: async ({ topic, message }) => {
@ -85,14 +85,15 @@ const subscribeKafka = async ({ connectionPrms, service, processKafkaMessage, lo
processKafkaMessage({ processKafkaMessage({
message, message,
service, service,
fn: _.find(service.functions, { sFnURL: topic }) fn: service.functions.find(fn => {
return fn.sFnURL === topic;
})
}); });
} catch (e) { } catch (e) {
await logger.error(`Ошибка обработки исходящего сообщения Kafka: ${makeErrorText(e)}`); await logger.error(`Ошибка обработки исходящего сообщения Kafka: ${makeErrorText(e)}`);
} }
} }
}); });
//Отслеживаем соединение //Отслеживаем соединение
consumer.on(consumer.events.CONNECT, () => { consumer.on(consumer.events.CONNECT, () => {
//Если сообщение о потере соединения уже выводилось //Если сообщение о потере соединения уже выводилось

View File

@ -7,7 +7,6 @@
// Подключение библиотек // Подключение библиотек
//---------------------- //----------------------
const _ = require("lodash"); //Работа с массивами и коллекциями
const { makeErrorText } = require("./utils"); //Вспомогательные функции const { makeErrorText } = require("./utils"); //Вспомогательные функции
const mqtt = require("mqtt"); //Работа с MQTT const mqtt = require("mqtt"); //Работа с MQTT
@ -16,48 +15,52 @@ const mqtt = require("mqtt"); //Работа с MQTT
//------------ //------------
//Отправка MQTT сообщения //Отправка MQTT сообщения
const publishMQTT = async ({ connectionPrms, url, auth, topic, message }) => { const publishMQTT = async ({ settings, url, auth, topic, message }) => {
//Инициализируем подключение //Инициализируем подключение
const client = await mqtt.connectAsync(url, { const client = await mqtt.connectAsync(url, {
clientId: connectionPrms.sClientIdSender, clientId: settings.sClientIdSender,
clean: true, clean: true,
connectTimeout: connectionPrms.nConnectTimeout, connectTimeout: settings.nConnectTimeout,
username: auth.user, username: auth.user,
password: auth.pass, password: auth.pass,
reconnectPeriod: connectionPrms.nReconnectPeriod reconnectPeriod: settings.nReconnectPeriod
}); });
//Отправляем сообщение //Отправляем сообщение
await client.publishAsync(topic, message); await client.publishAsync(topic, message);
//Закрываем подключение //Закрываем подключение
await client.endAsync(); await client.endAsync();
//Возвращаем сообщение, которое было отправлено //Возвращаем статус успешной отправки
return { statusCode: 200 }; return { statusCode: 200 };
}; };
//Получение MQTT сообщений //Получение MQTT сообщений
const subscribeMQTT = async ({ connectionPrms, service, processMQTTMessage, logger }) => { const subscribeMQTT = async ({ settings, service, processMQTTMessage, logger }) => {
try { try {
//Инициализируем строку подключения //Инициализируем строку подключения
let sBroker = service.sSrvRoot; let sBroker = service.sSrvRoot;
//Инициализируем подключение //Инициализируем подключение
const client = await mqtt.connectAsync(sBroker, { const client = await mqtt.connectAsync(sBroker, {
clientId: connectionPrms.sClientIdRecipient, clientId: settings.sClientIdRecipient,
clean: true, clean: true,
connectTimeout: connectionPrms.nConnectTimeout, connectTimeout: settings.nConnectTimeout,
username: service.sSrvUser, username: service.sSrvUser,
password: service.sSrvPass, password: service.sSrvPass,
reconnectPeriod: connectionPrms.nReconnectPeriod reconnectPeriod: settings.nReconnectPeriod
}); });
//Обходим функции сервиса //Обходим функции сервиса
_.forEach(service.functions, fn => { service.functions.forEach(fn => {
client.subscribe(fn.sFnURL); client.subscribe(fn.sFnURL);
}); });
//Прослушиваем сообщения //Прослушиваем сообщения
client.on("message", (topic, message) => { client.on("message", (topic, message) => {
//Обрабатываем сообщение //Обрабатываем сообщение
processMQTTMessage({ message, service, fn: _.find(service.functions, { sFnURL: topic }) }); processMQTTMessage({
message,
service,
fn: service.functions.find(fn => {
return fn.sFnURL === topic;
})
});
}); });
//Прослушиваем отключение от сервера //Прослушиваем отключение от сервера
client.on("offline", () => { client.on("offline", () => {
@ -69,7 +72,6 @@ const subscribeMQTT = async ({ connectionPrms, service, processMQTTMessage, logg
//Сообщим о восстановлении соединения //Сообщим о восстановлении соединения
logger.info(`Соединение с MQTT восстановлено (${sBroker})`); logger.info(`Соединение с MQTT восстановлено (${sBroker})`);
}); });
//Возвращаем подключение //Возвращаем подключение
return client; return client;
} catch (e) { } catch (e) {

View File

@ -71,9 +71,9 @@ class OutQueue extends EventEmitter {
//Привяжем методы к указателю на себя для использования в обработчиках событий //Привяжем методы к указателю на себя для использования в обработчиках событий
this.outDetectingLoop = this.outDetectingLoop.bind(this); this.outDetectingLoop = this.outDetectingLoop.bind(this);
//Параметры подключения к Kafka //Параметры подключения к Kafka
this.kafkaConnectionPrms = _.cloneDeep(prms.kafkaConnectionPrms); this.kafka = prms.kafka;
//Параметры подключения к MQTT //Параметры подключения к MQTT
this.mqttConnectionPrms = _.cloneDeep(prms.mqttConnectionPrms); this.mqtt = prms.mqtt;
} else { } else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
} }
@ -164,8 +164,8 @@ class OutQueue extends EventEmitter {
nId: prms.queue.nServiceFnId nId: prms.queue.nServiceFnId
}), }),
sProxy: this.sProxy, sProxy: this.sProxy,
kafkaConnectionPrms: this.kafkaConnectionPrms, kafka: this.kafka,
mqttConnectionPrms: this.mqttConnectionPrms mqtt: this.mqtt
}); });
//Уменьшаем количество доступных обработчиков //Уменьшаем количество доступных обработчиков
this.nWorkersLeft--; this.nWorkersLeft--;

View File

@ -20,6 +20,8 @@ const {
parseOptionsXML, parseOptionsXML,
buildOptionsXML, buildOptionsXML,
deepMerge, deepMerge,
getKafkaConnectionSettings,
getMQTTConnectionSettings,
getKafkaBroker, getKafkaBroker,
getKafkaAuth, getKafkaAuth,
getURLProtocol getURLProtocol
@ -31,6 +33,7 @@ const objQueueSchema = require("../models/obj_queue"); //Схемы валида
const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервиса const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервиса
const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса
const { const {
SERR_UNEXPECTED,
SERR_OBJECT_BAD_INTERFACE, SERR_OBJECT_BAD_INTERFACE,
SERR_APP_SERVER_BEFORE, SERR_APP_SERVER_BEFORE,
SERR_APP_SERVER_AFTER, SERR_APP_SERVER_AFTER,
@ -151,20 +154,38 @@ const appProcess = async prms => {
//Исходя из протокола собираем параметры //Исходя из протокола собираем параметры
switch (true) { switch (true) {
//Kafka //Kafka
case sProtocol === objServiceFnSchema.SPROTOCOL_KAFKA: case sProtocol === objServiceSchema.SPROTOCOL_KAFKA:
options.url = getKafkaBroker(prms.service.sSrvRoot); options.url = getKafkaBroker(prms.service.sSrvRoot);
options.body = prms.queue.blMsg; options.body = prms.queue.blMsg;
options.topic = prms.function.sFnURL; options.topic = prms.function.sFnURL;
options.auth = getKafkaAuth(prms.service.sSrvUser, prms.service.sSrvPass); options.auth = getKafkaAuth(prms.service.sSrvUser, prms.service.sSrvPass);
options.settings = getKafkaConnectionSettings(prms.service.sCode, prms.kafka);
//Если параметры подключения не считаны
if (!options.settings) {
//Расскажем об ошибке считывания
throw new ServerError(
SERR_UNEXPECTED,
`Ошибка получения настроек подключения к Kafka для сервиса "${prms.service.sCode}". Необходимо проверить соответствующий параметр ("kafka") файла конфигурации сервиса приложений ("config.js").`
);
}
//Указываем, что выполнение обработчика "После" невозможно //Указываем, что выполнение обработчика "После" невозможно
bExecuteAfter = false; bExecuteAfter = false;
break; break;
//mqtt и mqtts //mqtt и mqtts
case [objServiceFnSchema.SPROTOCOL_MQTT, objServiceFnSchema.SPROTOCOL_MQTTS].includes(sProtocol): case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol):
options.url = prms.service.sSrvRoot; options.url = prms.service.sSrvRoot;
options.body = prms.queue.blMsg; options.body = prms.queue.blMsg;
options.topic = prms.function.sFnURL; options.topic = prms.function.sFnURL;
options.auth = { user: prms.service.sSrvUser, pass: prms.service.sSrvPass }; options.auth = { user: prms.service.sSrvUser, pass: prms.service.sSrvPass };
options.settings = getMQTTConnectionSettings(prms.service.sCode, prms.mqtt);
//Если параметры подключения не считаны
if (!options.settings) {
//Расскажем об ошибке считывания
throw new ServerError(
SERR_UNEXPECTED,
`Ошибка получения настроек подключения к MQTT для сервиса "${prms.service.sCode}". Необходимо проверить соответствующий параметр ("mqtt") файла конфигурации сервиса приложений ("config.js").`
);
}
//Указываем, что выполнение обработчика "После" невозможно //Указываем, что выполнение обработчика "После" невозможно
bExecuteAfter = false; bExecuteAfter = false;
break; break;
@ -245,11 +266,9 @@ const appProcess = async prms => {
objServiceFnSchema.NFN_PRMS_TYPE_PATCH, objServiceFnSchema.NFN_PRMS_TYPE_PATCH,
objServiceFnSchema.NFN_PRMS_TYPE_PUT objServiceFnSchema.NFN_PRMS_TYPE_PUT
].includes(prms.function.nFnPrmsType) || ].includes(prms.function.nFnPrmsType) ||
[ [objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(
objServiceFnSchema.SPROTOCOL_KAFKA, sProtocol
objServiceFnSchema.SPROTOCOL_MQTT, )
objServiceFnSchema.SPROTOCOL_MQTTS
].includes(sProtocol)
) { ) {
options.body = prms.queue.blMsg; options.body = prms.queue.blMsg;
} else { } else {
@ -311,25 +330,25 @@ const appProcess = async prms => {
nQueueId: prms.queue.nId nQueueId: prms.queue.nId
}); });
} }
//Ждем ответ от удалённого сервера //Инициализируем ответ от сервера
options.resolveWithFullResponse = true;
let serverResp = null; let serverResp = null;
//Выполняем отправку исходя из протокола //Выполняем отправку исходя из протокола
switch (true) { switch (true) {
//Kafka //Kafka
case sProtocol === objServiceFnSchema.SPROTOCOL_KAFKA: case sProtocol === objServiceSchema.SPROTOCOL_KAFKA:
serverResp = await publishKafka({ serverResp = await publishKafka({
connectionPrms: prms.kafkaConnectionPrms, settings: options.settings,
url: options.url, url: options.url,
auth: options.auth, auth: options.auth,
topic: options.topic, topic: options.topic,
message: options.body message: options.body
}); });
console.log(serverResp);
break; break;
//mqtt и mqtts //mqtt и mqtts
case [objServiceFnSchema.SPROTOCOL_MQTT, objServiceFnSchema.SPROTOCOL_MQTTS].includes(sProtocol): case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol):
serverResp = await publishMQTT({ serverResp = await publishMQTT({
connectionPrms: prms.mqttConnectionPrms, settings: options.settings,
url: options.url, url: options.url,
auth: options.auth, auth: options.auth,
topic: options.topic, topic: options.topic,
@ -338,6 +357,9 @@ const appProcess = async prms => {
break; break;
//Другие //Другие
default: default:
//Ждем ответ от удалённого сервера
options.resolveWithFullResponse = true;
//Отправляем запрос
serverResp = await rqp(options); serverResp = await rqp(options);
break; break;
} }
@ -593,8 +615,8 @@ const processTask = async prms => {
service: prms.task.service, service: prms.task.service,
function: prms.task.function, function: prms.task.function,
sProxy: prms.task.sProxy, sProxy: prms.task.sProxy,
kafkaConnectionPrms: prms.task.kafkaConnectionPrms, kafka: prms.task.kafka,
mqttConnectionPrms: prms.task.mqttConnectionPrms mqtt: prms.task.mqtt
}); });
//Если результат обработки ошибка - пробрасываем её дальше //Если результат обработки ошибка - пробрасываем её дальше
if (res instanceof ServerError) { if (res instanceof ServerError) {

View File

@ -22,6 +22,7 @@ const {
} = require("./constants"); //Глобавльные константы системы } = require("./constants"); //Глобавльные константы системы
const { ServerError } = require("./server_errors"); //Ошибка сервера const { ServerError } = require("./server_errors"); //Ошибка сервера
const prmsUtilsSchema = require("../models/prms_utils"); //Схемы валидации параметров функций const prmsUtilsSchema = require("../models/prms_utils"); //Схемы валидации параметров функций
const { SPROTOCOL_HTTP, SPROTOCOL_KAFKA } = require("../models/obj_service"); //Схемы валидации сервиса
//------------ //------------
// Тело модуля // Тело модуля
@ -326,10 +327,49 @@ const deepMerge = (...args) => {
return res; return res;
}; };
//Получение брокера Kafka по адресу сервиса обмена //Считывание параметров подключения для сервиса обмена (при service === "" считывание подключения "По умолчанию", settingsArray - массив объектов [{sService: "", ...},...])
const getConnectionSettings = (service, settingsArray) => {
//Считываем параметры и возвращаем
return settingsArray.find(connection => {
return connection.sService === service;
});
};
//Считывание параметров подключения к Kafka для сервиса обмена (kafka - массив объектов [{sService: "", ...},...])
const getKafkaConnectionSettings = (service, kafka) => {
//Считываем подключение с указанным сервисом обмена
let kafkaConnection = getConnectionSettings(service, kafka);
//Если нет подключения с указанным сервисом обмена
if (!kafkaConnection) {
//Считываем "По умолчанию"
kafkaConnection = getConnectionSettings("", kafka);
}
//Вернем результат
return kafkaConnection;
};
//Считывание параметров подключения к MQTT для сервиса обмена (mqtt - массив объектов [{sService: "", ...},...])
const getMQTTConnectionSettings = (service, mqtt) => {
//Считываем подключение с указанным сервисом обмена
let mqttConnection = getConnectionSettings(service, mqtt);
//Если нет подключения с указанным сервисом обмена
if (!mqttConnection) {
//Считываем "По умолчанию"
mqttConnection = getConnectionSettings("", mqtt);
}
//Вернем результат
return mqttConnection;
};
//Получение брокера Kafka по адресу сервиса обмена (прим. kafka://server.ru -> server.ru, https://server.ru => undefined)
const getKafkaBroker = sURL => { const getKafkaBroker = sURL => {
//Убираем лишние символы //Если протокол URL - Kafka
if (getURLProtocol(sURL) === SPROTOCOL_KAFKA) {
//Возвращаем брокера
return sURL.slice(8); return sURL.slice(8);
}
//Возвращаем undefined
return;
}; };
//Получение авторизации для Kafka //Получение авторизации для Kafka
@ -337,10 +377,10 @@ const getKafkaAuth = (sUser, sPass) => {
return sUser ? { ssl: true, sasl: { mechanism: "plain", username: sUser, password: sPass } } : null; return sUser ? { ssl: true, sasl: { mechanism: "plain", username: sUser, password: sPass } } : null;
}; };
//Получение протокола адреса //Получение протокола адреса (прим. mqtt://server.ru -> mqtt, https://server.ru => https, ...)
const getURLProtocol = sURL => { const getURLProtocol = sURL => {
//Считываем протокол адреса //Если начинается с "/" - HTTP, иначе получаем из URL
return new URL(sURL).protocol.slice(0, -1); return sURL.substring(0, 1) === "/" ? SPROTOCOL_HTTP : new URL(sURL).protocol.slice(0, -1);
}; };
//----------------- //-----------------
@ -361,6 +401,8 @@ exports.parseOptionsXML = parseOptionsXML;
exports.buildOptionsXML = buildOptionsXML; exports.buildOptionsXML = buildOptionsXML;
exports.getNowString = getNowString; exports.getNowString = getNowString;
exports.deepMerge = deepMerge; exports.deepMerge = deepMerge;
exports.getKafkaConnectionSettings = getKafkaConnectionSettings;
exports.getMQTTConnectionSettings = getMQTTConnectionSettings;
exports.getKafkaBroker = getKafkaBroker; exports.getKafkaBroker = getKafkaBroker;
exports.getKafkaAuth = getKafkaAuth; exports.getKafkaAuth = getKafkaAuth;
exports.getURLProtocol = getURLProtocol; exports.getURLProtocol = getURLProtocol;

View File

@ -33,6 +33,13 @@ const NIS_AUTH_NO = 0; //Неаутентифицирован
const SIS_AUTH_YES = "IS_AUTH_YES"; //Аутентифицирован (строковый код) const SIS_AUTH_YES = "IS_AUTH_YES"; //Аутентифицирован (строковый код)
const SIS_AUTH_NO = "IS_AUTH_NO"; //Неаутентифицирован (строковый код) const SIS_AUTH_NO = "IS_AUTH_NO"; //Неаутентифицирован (строковый код)
//Протоколы работы сервиса
const SPROTOCOL_HTTP = "http"; //Протокол HTTP
const SPROTOCOL_HTTPS = "https"; //Протокол HTTPS
const SPROTOCOL_MQTT = "mqtt"; //Протокол MQTT
const SPROTOCOL_MQTTS = "mqtts"; //Протокол MQTTS
const SPROTOCOL_KAFKA = "kafka"; //Протокол для работы с KAFKA
//------------- //-------------
// Тело модуля // Тело модуля
//------------- //-------------
@ -59,6 +66,11 @@ exports.NIS_AUTH_YES = NIS_AUTH_YES;
exports.NIS_AUTH_NO = NIS_AUTH_NO; exports.NIS_AUTH_NO = NIS_AUTH_NO;
exports.SIS_AUTH_YES = SIS_AUTH_YES; exports.SIS_AUTH_YES = SIS_AUTH_YES;
exports.SIS_AUTH_NO = SIS_AUTH_NO; exports.SIS_AUTH_NO = SIS_AUTH_NO;
exports.SPROTOCOL_HTTP = SPROTOCOL_HTTP;
exports.SPROTOCOL_HTTPS = SPROTOCOL_HTTPS;
exports.SPROTOCOL_MQTT = SPROTOCOL_MQTT;
exports.SPROTOCOL_MQTTS = SPROTOCOL_MQTTS;
exports.SPROTOCOL_KAFKA = SPROTOCOL_KAFKA;
//Схема валидации сервиса //Схема валидации сервиса
exports.Service = new Schema({ exports.Service = new Schema({
@ -144,10 +156,8 @@ exports.Service = new Schema({
enum: [NUNAVLBL_NTF_SIGN_NO, NUNAVLBL_NTF_SIGN_YES], enum: [NUNAVLBL_NTF_SIGN_NO, NUNAVLBL_NTF_SIGN_YES],
required: true, required: true,
message: { message: {
type: path => type: path => `Признак необходимости оповещения о простое внешнего сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)`,
`Признак необходимости оповещения о простое внешнего сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)`, enum: path => `Значение признака необходимости оповещения о простое внешнего сервиса (${path}) не поддерживается`,
enum: path =>
`Значение признака необходимости оповещения о простое внешнего сервиса (${path}) не поддерживается`,
required: path => `Не указан признак необходимости оповещения о простое внешнего сервиса (${path})` required: path => `Не указан признак необходимости оповещения о простое внешнего сервиса (${path})`
} }
}, },
@ -159,10 +169,8 @@ exports.Service = new Schema({
message: { message: {
type: path => type: path =>
`Строковый код признака необходимости оповещения о простое внешнего сервиса (${path}) имеет некорректный тип данных (ожидалось - String)`, `Строковый код признака необходимости оповещения о простое внешнего сервиса (${path}) имеет некорректный тип данных (ожидалось - String)`,
enum: path => enum: path => `Значение строкового кода признака необходимости оповещения о простое внешнего сервиса (${path}) не поддерживается`,
`Значение строкового кода признака необходимости оповещения о простое внешнего сервиса (${path}) не поддерживается`, required: path => `Не указан строковый код признака необходимости оповещения о простое внешнего сервиса (${path})`
required: path =>
`Не указан строковый код признака необходимости оповещения о простое внешнего сервиса (${path})`
} }
}, },
//Максимальное время простоя (мин) удалённого сервиса для генерации оповещения //Максимальное время простоя (мин) удалённого сервиса для генерации оповещения
@ -172,8 +180,7 @@ exports.Service = new Schema({
message: { message: {
type: path => type: path =>
`Максимальное время простоя (мин) удалённого сервиса для генерации оповещения (${path}) имеет некорректный тип данных (ожидалось - Number)`, `Максимальное время простоя (мин) удалённого сервиса для генерации оповещения (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => required: path => `Не указано максимальное время простоя (мин) удалённого сервиса для генерации оповещения (${path})`
`Не указано максимальное время простоя (мин) удалённого сервиса для генерации оповещения (${path})`
} }
}, },
//Список адресов E-Mail для оповещения о простое внешнего сервиса //Список адресов E-Mail для оповещения о простое внешнего сервиса
@ -194,8 +201,7 @@ exports.Service = new Schema({
type: String, type: String,
required: false, required: false,
message: { message: {
type: path => type: path => `Адрес прокси-сервера в очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
`Адрес прокси-сервера в очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан адрес прокси-сервера в очереди обмена (${path})` required: path => `Не указан адрес прокси-сервера в очереди обмена (${path})`
} }
}, },
@ -237,8 +243,7 @@ exports.ServiceCtx = new Schema({
type: String, type: String,
required: false, required: false,
message: { message: {
type: path => type: path => `Строковое представление даты истечения контекста (${path}) имеет некорректный тип данных (ожидалось - String)`,
`Строковое представление даты истечения контекста (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указано строковое представление даты истечения контекста (${path})` required: path => `Не указано строковое представление даты истечения контекста (${path})`
} }
}, },
@ -248,8 +253,7 @@ exports.ServiceCtx = new Schema({
enum: [NIS_AUTH_YES, NIS_AUTH_NO], enum: [NIS_AUTH_YES, NIS_AUTH_NO],
required: true, required: true,
message: { message: {
type: path => type: path => `Признака аутентицированности сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)`,
`Признака аутентицированности сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)`,
enum: path => `Значение признака аутентицированности сервиса (${path}) не поддерживается`, enum: path => `Значение признака аутентицированности сервиса (${path}) не поддерживается`,
required: path => `Не указан признак аутентицированности сервиса (${path})` required: path => `Не указан признак аутентицированности сервиса (${path})`
} }
@ -260,8 +264,7 @@ exports.ServiceCtx = new Schema({
enum: [SIS_AUTH_YES, SIS_AUTH_NO], enum: [SIS_AUTH_YES, SIS_AUTH_NO],
required: true, required: true,
message: { message: {
type: path => type: path => `Строковый код признака аутентицированности сервиса (${path}) имеет некорректный тип данных (ожидалось - String)`,
`Строковый код признака аутентицированности сервиса (${path}) имеет некорректный тип данных (ожидалось - String)`,
enum: path => `Значение строкового кода признака аутентицированности сервиса (${path}) не поддерживается`, enum: path => `Значение строкового кода признака аутентицированности сервиса (${path}) не поддерживается`,
required: path => `Не указан строковый код признака аутентицированности сервиса (${path})` required: path => `Не указан строковый код признака аутентицированности сервиса (${path})`
} }
@ -284,8 +287,7 @@ exports.ServiceExpiredQueueInfo = new Schema({
type: Number, type: Number,
required: true, required: true,
message: { message: {
type: path => type: path => `Количество просроченных сообщений обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
`Количество просроченных сообщений обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указано количество просроченных сообщений обмена (${path})` required: path => `Не указано количество просроченных сообщений обмена (${path})`
} }
}, },
@ -294,8 +296,7 @@ exports.ServiceExpiredQueueInfo = new Schema({
type: String, type: String,
required: true, required: true,
message: { message: {
type: path => type: path => `Информация о просроченных сообщениях обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
`Информация о просроченных сообщениях обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указана информация о просроченных сообщениях обмена (${path})` required: path => `Не указана информация о просроченных сообщениях обмена (${path})`
} }
} }

View File

@ -70,13 +70,6 @@ const NERR_NTF_SIGN_YES = 1; //Оповещать об ошибке исполн
const SERR_NTF_SIGN_NO = "ERR_NTF_SIGN_NO"; //Не оповещать об ошибке исполнения (строковый код) const SERR_NTF_SIGN_NO = "ERR_NTF_SIGN_NO"; //Не оповещать об ошибке исполнения (строковый код)
const SERR_NTF_SIGN_YES = "ERR_NTF_SIGN_YES"; //Оповещать об ошибке исполнения (строковый код) const SERR_NTF_SIGN_YES = "ERR_NTF_SIGN_YES"; //Оповещать об ошибке исполнения (строковый код)
//Протоколы работы сервиса
const SPROTOCOL_HTTP = "http"; //Протокол HTTP
const SPROTOCOL_HTTPS = "https"; //Протокол HTTPS
const SPROTOCOL_MQTT = "mqtt"; //Протокол MQTT
const SPROTOCOL_MQTTS = "mqtts"; //Протокол MQTTS
const SPROTOCOL_KAFKA = "kafka"; //Протокол для работы с KAFKA
//------------- //-------------
// Тело модуля // Тело модуля
//------------- //-------------
@ -146,11 +139,6 @@ exports.NERR_NTF_SIGN_NO = NERR_NTF_SIGN_NO;
exports.NERR_NTF_SIGN_YES = NERR_NTF_SIGN_YES; exports.NERR_NTF_SIGN_YES = NERR_NTF_SIGN_YES;
exports.SERR_NTF_SIGN_NO = SERR_NTF_SIGN_NO; exports.SERR_NTF_SIGN_NO = SERR_NTF_SIGN_NO;
exports.SERR_NTF_SIGN_YES = SERR_NTF_SIGN_YES; exports.SERR_NTF_SIGN_YES = SERR_NTF_SIGN_YES;
exports.SPROTOCOL_HTTP = SPROTOCOL_HTTP;
exports.SPROTOCOL_HTTPS = SPROTOCOL_HTTPS;
exports.SPROTOCOL_MQTT = SPROTOCOL_MQTT;
exports.SPROTOCOL_MQTTS = SPROTOCOL_MQTTS;
exports.SPROTOCOL_KAFKA = SPROTOCOL_KAFKA;
//Схема валидации функции сервиса //Схема валидации функции сервиса
exports.ServiceFunction = new Schema({ exports.ServiceFunction = new Schema({