master #4

Merged
Mim merged 3 commits from Dollerok/P8-ExchangeService:master into master 2024-11-20 18:26:55 +03:00
5 changed files with 49 additions and 8 deletions

View File

@ -89,7 +89,20 @@ const kafka = [
//Время максимального ожидания между попытками переподключения (мс) //Время максимального ожидания между попытками переподключения (мс)
nMaxRetryTime: 20000, nMaxRetryTime: 20000,
//Время ожидания между попытками переподключения (мс) //Время ожидания между попытками переподключения (мс)
nInitialRetryTime: 10000 nInitialRetryTime: 10000,
//Использовать аутентификацию по SSL-сертификату
bAuthSSL: false,
//Параметры аутентификации по SSL-сертификату
ssl: {
//Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить)
bRejectUnauthorized: true,
//Путь к центру сертификации
sPathCa: "",
//Путь к закрытому ключу
sPathKey: "",
//Путь к сертификату
sPathCert: ""
}
} }
]; ];

View File

@ -89,7 +89,20 @@ const kafka = [
//Время максимального ожидания между попытками переподключения (мс) //Время максимального ожидания между попытками переподключения (мс)
nMaxRetryTime: 20000, nMaxRetryTime: 20000,
//Время ожидания между попытками переподключения (мс) //Время ожидания между попытками переподключения (мс)
nInitialRetryTime: 10000 nInitialRetryTime: 10000,
//Использовать аутентификацию по SSL-сертификату
bAuthSSL: false,
//Параметры аутентификации по SSL-сертификату
ssl: {
//Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить)
bRejectUnauthorized: true,
//Путь к центру сертификации
sPathCa: "",
//Путь к закрытому ключу
sPathKey: "",
//Путь к сертификату
sPathCert: ""
}
} }
]; ];

View File

@ -8,7 +8,7 @@
//---------------------- //----------------------
const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции
const { Kafka, logLevel } = require("kafkajs"); //Работа с Kafka const { Kafka, Partitioners, logLevel } = require("kafkajs"); //Работа с Kafka
//------------ //------------
// Тело модуля // Тело модуля
@ -25,7 +25,7 @@ const publishKafka = async ({ settings, url, auth, topic, message }) => {
...auth ...auth
}); });
//Инициализируем продюсера //Инициализируем продюсера
let producer = kafka.producer(); let producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner });
//Подключаемся к Kafka //Подключаемся к Kafka
await producer.connect(); await producer.connect();
//Отправляем сообщение //Отправляем сообщение
@ -36,7 +36,7 @@ const publishKafka = async ({ settings, url, auth, topic, message }) => {
return { statusCode: 200 }; return { statusCode: 200 };
}; };
//Получение MQTT сообщений //Получение Kafka сообщений
const subscribeKafka = async ({ settings, service, processMessage, logger }) => { const subscribeKafka = async ({ settings, service, processMessage, logger }) => {
try { try {
//Признак необходимости вывода сообщения о потере соединения //Признак необходимости вывода сообщения о потере соединения
@ -48,7 +48,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
clientId: settings.sClientIdRecipient, clientId: settings.sClientIdRecipient,
brokers: [sBroker], brokers: [sBroker],
connectionTimeout: settings.nConnectionTimeout, connectionTimeout: settings.nConnectionTimeout,
...getKafkaAuth(service.sSrvUser, service.sSrvPass), ...getKafkaAuth(service.sSrvUser, service.sSrvPass, settings),
logLevel: logLevel.NOTHING, logLevel: logLevel.NOTHING,
retry: { retry: {
retries: 0, retries: 0,

View File

@ -158,8 +158,8 @@ const appProcess = async prms => {
options.url = getKafkaBroker(prms.service.sSrvRoot); options.url = getKafkaBroker(prms.service.sSrvRoot);
options.body = prms.queue.blMsg; options.body = prms.queue.blMsg;
options.topic = prms.function.sFnURL; options.topic = prms.function.sFnURL;
options.auth = getKafkaAuth(prms.service.sSrvUser, prms.service.sSrvPass);
options.settings = getKafkaConnectionSettings(prms.service.sCode, prms.kafka); options.settings = getKafkaConnectionSettings(prms.service.sCode, prms.kafka);
options.auth = getKafkaAuth(prms.service.sSrvUser, prms.service.sSrvPass, options.settings);
//Если параметры подключения не считаны //Если параметры подключения не считаны
if (!options.settings) { if (!options.settings) {
//Расскажем об ошибке считывания //Расскажем об ошибке считывания
@ -321,6 +321,7 @@ const appProcess = async prms => {
delete tmpOptions.body; delete tmpOptions.body;
delete tmpOptions.cert; delete tmpOptions.cert;
delete tmpOptions.key; delete tmpOptions.key;
delete tmpOptions.auth;
//Конвертируем в XML //Конвертируем в XML
let sOptions = buildOptionsXML({ options: tmpOptions }); let sOptions = buildOptionsXML({ options: tmpOptions });
//Сохраняемв БД //Сохраняемв БД

View File

@ -7,6 +7,7 @@
// Подключение библиотек // Подключение библиотек
//---------------------- //----------------------
const fs = require("fs"); //Работа с файлами
const _ = require("lodash"); //Работа с массивами и объектами const _ = require("lodash"); //Работа с массивами и объектами
const os = require("os"); //Средства операционной системы const os = require("os"); //Средства операционной системы
const xml2js = require("xml2js"); //Конвертация XML в JSON const xml2js = require("xml2js"); //Конвертация XML в JSON
@ -379,7 +380,20 @@ const getKafkaBroker = sURL => {
}; };
//Получение авторизации для Kafka //Получение авторизации для Kafka
const getKafkaAuth = (sUser, sPass) => { const getKafkaAuth = (sUser, sPass, kafka) => {
//Если аутентификация по SSL-сертификату
if (kafka.bAuthSSL) {
//Возвращаем авторизацию в формате SSL
return {
ssl: {
rejectUnauthorized: kafka.ssl.bRejectUnauthorized,
ca: kafka.ssl.sPathCa ? [fs.readFileSync(kafka.ssl.sPathCa, "utf-8")] : [],
key: kafka.ssl.sPathKey ? fs.readFileSync(kafka.ssl.sPathKey, "utf-8") : "",
cert: kafka.ssl.sPathCert ? fs.readFileSync(kafka.ssl.sPathCert, "utf-8") : ""
}
};
}
//Возвращаем авторизацию по пользователю, если необходимо
return sUser ? { ssl: true, sasl: { mechanism: "plain", username: sUser, password: sPass } } : null; return sUser ? { ssl: true, sasl: { mechanism: "plain", username: sUser, password: sPass } } : null;
}; };