From 1743a2f70dce3b93bb40a6541b155343359bec6a Mon Sep 17 00:00:00 2001 From: Dollerino Date: Mon, 11 Nov 2024 15:28:36 +0300 Subject: [PATCH 1/2] =?UTF-8?q?=D0=A6=D0=98=D0=A2=D0=9A-926=20-=20=D0=9F?= =?UTF-8?q?=D0=BE=D0=B4=D0=B4=D0=B5=D1=80=D0=B6=D0=BA=D0=B0=20SSL=20=D0=BF?= =?UTF-8?q?=D1=80=D0=B8=20=D0=BF=D0=BE=D0=B4=D0=BA=D0=BB=D1=8E=D1=87=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D0=B8=20=D0=BA=20Kafka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.js | 15 ++++++++++++++- config_default.js | 15 ++++++++++++++- core/kafka_connector.js | 8 ++++---- core/out_queue_processor.js | 3 ++- core/utils.js | 16 +++++++++++++++- 5 files changed, 49 insertions(+), 8 deletions(-) diff --git a/config.js b/config.js index 7f6c4b3..37b000c 100644 --- a/config.js +++ b/config.js @@ -89,7 +89,20 @@ const kafka = [ //Время максимального ожидания между попытками переподключения (мс) nMaxRetryTime: 20000, //Время ожидания между попытками переподключения (мс) - nInitialRetryTime: 10000 + nInitialRetryTime: 10000, + //Использовать аутентификацию по SSL-сертификату + bAuthSSL: false, + //Параметры аутентификации по SSL-сертификату + ssl: { + //Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить) + bRejectUnauthorized: true, + //Путь к корневому сертификату + sPathCa: "", + //Путь к закрытому ключу + sPathKey: "", + //Путь к SSL сертификату + sPathCert: "" + } } ]; diff --git a/config_default.js b/config_default.js index cf940d0..b05f57e 100644 --- a/config_default.js +++ b/config_default.js @@ -89,7 +89,20 @@ const kafka = [ //Время максимального ожидания между попытками переподключения (мс) nMaxRetryTime: 20000, //Время ожидания между попытками переподключения (мс) - nInitialRetryTime: 10000 + nInitialRetryTime: 10000, + //Использовать аутентификацию по SSL-сертификату + bAuthSSL: false, + //Параметры аутентификации по SSL-сертификату + ssl: { + //Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить) + bRejectUnauthorized: true, + //Путь к корневому сертификату + sPathCa: "", + //Путь к закрытому ключу + sPathKey: "", + //Путь к SSL сертификату + sPathCert: "" + } } ]; diff --git a/core/kafka_connector.js b/core/kafka_connector.js index 90dcba6..e2b89c3 100644 --- a/core/kafka_connector.js +++ b/core/kafka_connector.js @@ -8,7 +8,7 @@ //---------------------- const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции -const { Kafka, logLevel } = require("kafkajs"); //Работа с Kafka +const { Kafka, Partitioners, logLevel } = require("kafkajs"); //Работа с Kafka //------------ // Тело модуля @@ -25,7 +25,7 @@ const publishKafka = async ({ settings, url, auth, topic, message }) => { ...auth }); //Инициализируем продюсера - let producer = kafka.producer(); + let producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner }); //Подключаемся к Kafka await producer.connect(); //Отправляем сообщение @@ -36,7 +36,7 @@ const publishKafka = async ({ settings, url, auth, topic, message }) => { return { statusCode: 200 }; }; -//Получение MQTT сообщений +//Получение Kafka сообщений const subscribeKafka = async ({ settings, service, processMessage, logger }) => { try { //Признак необходимости вывода сообщения о потере соединения @@ -48,7 +48,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => clientId: settings.sClientIdRecipient, brokers: [sBroker], connectionTimeout: settings.nConnectionTimeout, - ...getKafkaAuth(service.sSrvUser, service.sSrvPass), + ...getKafkaAuth(service.sSrvUser, service.sSrvPass, settings), logLevel: logLevel.NOTHING, retry: { retries: 0, diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 958ed26..bee715f 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -158,8 +158,8 @@ const appProcess = async prms => { options.url = getKafkaBroker(prms.service.sSrvRoot); options.body = prms.queue.blMsg; options.topic = prms.function.sFnURL; - options.auth = getKafkaAuth(prms.service.sSrvUser, prms.service.sSrvPass); options.settings = getKafkaConnectionSettings(prms.service.sCode, prms.kafka); + options.auth = getKafkaAuth(prms.service.sSrvUser, prms.service.sSrvPass, options.settings); //Если параметры подключения не считаны if (!options.settings) { //Расскажем об ошибке считывания @@ -321,6 +321,7 @@ const appProcess = async prms => { delete tmpOptions.body; delete tmpOptions.cert; delete tmpOptions.key; + delete tmpOptions.auth; //Конвертируем в XML let sOptions = buildOptionsXML({ options: tmpOptions }); //Сохраняемв БД diff --git a/core/utils.js b/core/utils.js index f2d39df..46ec904 100644 --- a/core/utils.js +++ b/core/utils.js @@ -7,6 +7,7 @@ // Подключение библиотек //---------------------- +const fs = require("fs"); //Работа с файлами const _ = require("lodash"); //Работа с массивами и объектами const os = require("os"); //Средства операционной системы const xml2js = require("xml2js"); //Конвертация XML в JSON @@ -379,7 +380,20 @@ const getKafkaBroker = sURL => { }; //Получение авторизации для Kafka -const getKafkaAuth = (sUser, sPass) => { +const getKafkaAuth = (sUser, sPass, kafka) => { + //Если аутентификация по SSL-сертификату + if (kafka.bAuthSSL) { + //Возвращаем авторизацию в формате SSL + return { + ssl: { + rejectUnauthorized: kafka.ssl.bRejectUnauthorized, + ca: kafka.ssl.sPathCa ? [fs.readFileSync(kafka.ssl.sPathCa, "utf-8")] : [], + key: kafka.ssl.sPathKey ? fs.readFileSync(kafka.ssl.sPathKey, "utf-8") : "", + cert: kafka.ssl.sPathCert ? fs.readFileSync(kafka.ssl.sPathCert, "utf-8") : "" + } + }; + } + //Возвращаем авторизацию по пользователю, если необходимо return sUser ? { ssl: true, sasl: { mechanism: "plain", username: sUser, password: sPass } } : null; }; -- 2.34.1 From 230ab0978fbb475bc4b04d81e916d42df420ce24 Mon Sep 17 00:00:00 2001 From: Dollerino Date: Tue, 12 Nov 2024 13:36:21 +0300 Subject: [PATCH 2/2] =?UTF-8?q?=D0=A6=D0=98=D0=A2=D0=9A-926=20-=20=D0=9F?= =?UTF-8?q?=D0=BE=D0=B4=D0=B4=D0=B5=D1=80=D0=B6=D0=BA=D0=B0=20SSL=20=D0=BF?= =?UTF-8?q?=D1=80=D0=B8=20=D0=BF=D0=BE=D0=B4=D0=BA=D0=BB=D1=8E=D1=87=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D0=B8=20=D0=BA=20Kafka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.js | 4 ++-- config_default.js | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/config.js b/config.js index 37b000c..501858c 100644 --- a/config.js +++ b/config.js @@ -96,11 +96,11 @@ const kafka = [ ssl: { //Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить) bRejectUnauthorized: true, - //Путь к корневому сертификату + //Путь к центру сертификации sPathCa: "", //Путь к закрытому ключу sPathKey: "", - //Путь к SSL сертификату + //Путь к сертификату sPathCert: "" } } diff --git a/config_default.js b/config_default.js index b05f57e..f6d36cc 100644 --- a/config_default.js +++ b/config_default.js @@ -96,11 +96,11 @@ const kafka = [ ssl: { //Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить) bRejectUnauthorized: true, - //Путь к корневому сертификату + //Путь к центру сертификации sPathCa: "", //Путь к закрытому ключу sPathKey: "", - //Путь к SSL сертификату + //Путь к сертификату sPathCert: "" } } -- 2.34.1