master #4

Merged
Mim merged 3 commits from Dollerok/P8-ExchangeService:master into master 2024-11-20 18:26:55 +03:00
5 changed files with 49 additions and 8 deletions

View File

@ -89,7 +89,20 @@ const kafka = [
//Время максимального ожидания между попытками переподключения (мс)
nMaxRetryTime: 20000,
//Время ожидания между попытками переподключения (мс)
nInitialRetryTime: 10000
nInitialRetryTime: 10000,
//Использовать аутентификацию по SSL-сертификату
bAuthSSL: false,
//Параметры аутентификации по SSL-сертификату
ssl: {
//Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить)
bRejectUnauthorized: true,
//Путь к центру сертификации
sPathCa: "",
//Путь к закрытому ключу
sPathKey: "",
//Путь к сертификату
sPathCert: ""
}
}
];

View File

@ -89,7 +89,20 @@ const kafka = [
//Время максимального ожидания между попытками переподключения (мс)
nMaxRetryTime: 20000,
//Время ожидания между попытками переподключения (мс)
nInitialRetryTime: 10000
nInitialRetryTime: 10000,
//Использовать аутентификацию по SSL-сертификату
bAuthSSL: false,
//Параметры аутентификации по SSL-сертификату
ssl: {
//Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить)
bRejectUnauthorized: true,
//Путь к центру сертификации
sPathCa: "",
//Путь к закрытому ключу
sPathKey: "",
//Путь к сертификату
sPathCert: ""
}
}
];

View File

@ -8,7 +8,7 @@
//----------------------
const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции
const { Kafka, logLevel } = require("kafkajs"); //Работа с Kafka
const { Kafka, Partitioners, logLevel } = require("kafkajs"); //Работа с Kafka
//------------
// Тело модуля
@ -25,7 +25,7 @@ const publishKafka = async ({ settings, url, auth, topic, message }) => {
...auth
});
//Инициализируем продюсера
let producer = kafka.producer();
let producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner });
//Подключаемся к Kafka
await producer.connect();
//Отправляем сообщение
@ -36,7 +36,7 @@ const publishKafka = async ({ settings, url, auth, topic, message }) => {
return { statusCode: 200 };
};
//Получение MQTT сообщений
//Получение Kafka сообщений
const subscribeKafka = async ({ settings, service, processMessage, logger }) => {
try {
//Признак необходимости вывода сообщения о потере соединения
@ -48,7 +48,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
clientId: settings.sClientIdRecipient,
brokers: [sBroker],
connectionTimeout: settings.nConnectionTimeout,
...getKafkaAuth(service.sSrvUser, service.sSrvPass),
...getKafkaAuth(service.sSrvUser, service.sSrvPass, settings),
logLevel: logLevel.NOTHING,
retry: {
retries: 0,

View File

@ -158,8 +158,8 @@ const appProcess = async prms => {
options.url = getKafkaBroker(prms.service.sSrvRoot);
options.body = prms.queue.blMsg;
options.topic = prms.function.sFnURL;
options.auth = getKafkaAuth(prms.service.sSrvUser, prms.service.sSrvPass);
options.settings = getKafkaConnectionSettings(prms.service.sCode, prms.kafka);
options.auth = getKafkaAuth(prms.service.sSrvUser, prms.service.sSrvPass, options.settings);
//Если параметры подключения не считаны
if (!options.settings) {
//Расскажем об ошибке считывания
@ -321,6 +321,7 @@ const appProcess = async prms => {
delete tmpOptions.body;
delete tmpOptions.cert;
delete tmpOptions.key;
delete tmpOptions.auth;
//Конвертируем в XML
let sOptions = buildOptionsXML({ options: tmpOptions });
//Сохраняемв БД

View File

@ -7,6 +7,7 @@
// Подключение библиотек
//----------------------
const fs = require("fs"); //Работа с файлами
const _ = require("lodash"); //Работа с массивами и объектами
const os = require("os"); //Средства операционной системы
const xml2js = require("xml2js"); //Конвертация XML в JSON
@ -379,7 +380,20 @@ const getKafkaBroker = sURL => {
};
//Получение авторизации для Kafka
const getKafkaAuth = (sUser, sPass) => {
const getKafkaAuth = (sUser, sPass, kafka) => {
//Если аутентификация по SSL-сертификату
if (kafka.bAuthSSL) {
//Возвращаем авторизацию в формате SSL
return {
ssl: {
rejectUnauthorized: kafka.ssl.bRejectUnauthorized,
ca: kafka.ssl.sPathCa ? [fs.readFileSync(kafka.ssl.sPathCa, "utf-8")] : [],
key: kafka.ssl.sPathKey ? fs.readFileSync(kafka.ssl.sPathKey, "utf-8") : "",
cert: kafka.ssl.sPathCert ? fs.readFileSync(kafka.ssl.sPathCert, "utf-8") : ""
}
};
}
//Возвращаем авторизацию по пользователю, если необходимо
return sUser ? { ssl: true, sasl: { mechanism: "plain", username: sUser, password: sPass } } : null;
};