ЦИТК-926 - Поддержка SSL при подключении к Kafka
This commit is contained in:
parent
4440345fba
commit
1743a2f70d
15
config.js
15
config.js
@ -89,7 +89,20 @@ const kafka = [
|
||||
//Время максимального ожидания между попытками переподключения (мс)
|
||||
nMaxRetryTime: 20000,
|
||||
//Время ожидания между попытками переподключения (мс)
|
||||
nInitialRetryTime: 10000
|
||||
nInitialRetryTime: 10000,
|
||||
//Использовать аутентификацию по SSL-сертификату
|
||||
bAuthSSL: false,
|
||||
//Параметры аутентификации по SSL-сертификату
|
||||
ssl: {
|
||||
//Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить)
|
||||
bRejectUnauthorized: true,
|
||||
//Путь к корневому сертификату
|
||||
sPathCa: "",
|
||||
//Путь к закрытому ключу
|
||||
sPathKey: "",
|
||||
//Путь к SSL сертификату
|
||||
sPathCert: ""
|
||||
}
|
||||
}
|
||||
];
|
||||
|
||||
|
@ -89,7 +89,20 @@ const kafka = [
|
||||
//Время максимального ожидания между попытками переподключения (мс)
|
||||
nMaxRetryTime: 20000,
|
||||
//Время ожидания между попытками переподключения (мс)
|
||||
nInitialRetryTime: 10000
|
||||
nInitialRetryTime: 10000,
|
||||
//Использовать аутентификацию по SSL-сертификату
|
||||
bAuthSSL: false,
|
||||
//Параметры аутентификации по SSL-сертификату
|
||||
ssl: {
|
||||
//Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить)
|
||||
bRejectUnauthorized: true,
|
||||
//Путь к корневому сертификату
|
||||
sPathCa: "",
|
||||
//Путь к закрытому ключу
|
||||
sPathKey: "",
|
||||
//Путь к SSL сертификату
|
||||
sPathCert: ""
|
||||
}
|
||||
}
|
||||
];
|
||||
|
||||
|
@ -8,7 +8,7 @@
|
||||
//----------------------
|
||||
|
||||
const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции
|
||||
const { Kafka, logLevel } = require("kafkajs"); //Работа с Kafka
|
||||
const { Kafka, Partitioners, logLevel } = require("kafkajs"); //Работа с Kafka
|
||||
|
||||
//------------
|
||||
// Тело модуля
|
||||
@ -25,7 +25,7 @@ const publishKafka = async ({ settings, url, auth, topic, message }) => {
|
||||
...auth
|
||||
});
|
||||
//Инициализируем продюсера
|
||||
let producer = kafka.producer();
|
||||
let producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner });
|
||||
//Подключаемся к Kafka
|
||||
await producer.connect();
|
||||
//Отправляем сообщение
|
||||
@ -36,7 +36,7 @@ const publishKafka = async ({ settings, url, auth, topic, message }) => {
|
||||
return { statusCode: 200 };
|
||||
};
|
||||
|
||||
//Получение MQTT сообщений
|
||||
//Получение Kafka сообщений
|
||||
const subscribeKafka = async ({ settings, service, processMessage, logger }) => {
|
||||
try {
|
||||
//Признак необходимости вывода сообщения о потере соединения
|
||||
@ -48,7 +48,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
|
||||
clientId: settings.sClientIdRecipient,
|
||||
brokers: [sBroker],
|
||||
connectionTimeout: settings.nConnectionTimeout,
|
||||
...getKafkaAuth(service.sSrvUser, service.sSrvPass),
|
||||
...getKafkaAuth(service.sSrvUser, service.sSrvPass, settings),
|
||||
logLevel: logLevel.NOTHING,
|
||||
retry: {
|
||||
retries: 0,
|
||||
|
@ -158,8 +158,8 @@ const appProcess = async prms => {
|
||||
options.url = getKafkaBroker(prms.service.sSrvRoot);
|
||||
options.body = prms.queue.blMsg;
|
||||
options.topic = prms.function.sFnURL;
|
||||
options.auth = getKafkaAuth(prms.service.sSrvUser, prms.service.sSrvPass);
|
||||
options.settings = getKafkaConnectionSettings(prms.service.sCode, prms.kafka);
|
||||
options.auth = getKafkaAuth(prms.service.sSrvUser, prms.service.sSrvPass, options.settings);
|
||||
//Если параметры подключения не считаны
|
||||
if (!options.settings) {
|
||||
//Расскажем об ошибке считывания
|
||||
@ -321,6 +321,7 @@ const appProcess = async prms => {
|
||||
delete tmpOptions.body;
|
||||
delete tmpOptions.cert;
|
||||
delete tmpOptions.key;
|
||||
delete tmpOptions.auth;
|
||||
//Конвертируем в XML
|
||||
let sOptions = buildOptionsXML({ options: tmpOptions });
|
||||
//Сохраняемв БД
|
||||
|
@ -7,6 +7,7 @@
|
||||
// Подключение библиотек
|
||||
//----------------------
|
||||
|
||||
const fs = require("fs"); //Работа с файлами
|
||||
const _ = require("lodash"); //Работа с массивами и объектами
|
||||
const os = require("os"); //Средства операционной системы
|
||||
const xml2js = require("xml2js"); //Конвертация XML в JSON
|
||||
@ -379,7 +380,20 @@ const getKafkaBroker = sURL => {
|
||||
};
|
||||
|
||||
//Получение авторизации для Kafka
|
||||
const getKafkaAuth = (sUser, sPass) => {
|
||||
const getKafkaAuth = (sUser, sPass, kafka) => {
|
||||
//Если аутентификация по SSL-сертификату
|
||||
if (kafka.bAuthSSL) {
|
||||
//Возвращаем авторизацию в формате SSL
|
||||
return {
|
||||
ssl: {
|
||||
rejectUnauthorized: kafka.ssl.bRejectUnauthorized,
|
||||
ca: kafka.ssl.sPathCa ? [fs.readFileSync(kafka.ssl.sPathCa, "utf-8")] : [],
|
||||
key: kafka.ssl.sPathKey ? fs.readFileSync(kafka.ssl.sPathKey, "utf-8") : "",
|
||||
cert: kafka.ssl.sPathCert ? fs.readFileSync(kafka.ssl.sPathCert, "utf-8") : ""
|
||||
}
|
||||
};
|
||||
}
|
||||
//Возвращаем авторизацию по пользователю, если необходимо
|
||||
return sUser ? { ssl: true, sasl: { mechanism: "plain", username: sUser, password: sPass } } : null;
|
||||
};
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user