master #4
15
config.js
15
config.js
@ -89,7 +89,20 @@ const kafka = [
|
|||||||
//Время максимального ожидания между попытками переподключения (мс)
|
//Время максимального ожидания между попытками переподключения (мс)
|
||||||
nMaxRetryTime: 20000,
|
nMaxRetryTime: 20000,
|
||||||
//Время ожидания между попытками переподключения (мс)
|
//Время ожидания между попытками переподключения (мс)
|
||||||
nInitialRetryTime: 10000
|
nInitialRetryTime: 10000,
|
||||||
|
//Использовать аутентификацию по SSL-сертификату
|
||||||
|
bAuthSSL: false,
|
||||||
|
//Параметры аутентификации по SSL-сертификату
|
||||||
|
ssl: {
|
||||||
|
//Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить)
|
||||||
|
bRejectUnauthorized: true,
|
||||||
|
//Путь к центру сертификации
|
||||||
|
sPathCa: "",
|
||||||
|
//Путь к закрытому ключу
|
||||||
|
sPathKey: "",
|
||||||
|
//Путь к сертификату
|
||||||
|
sPathCert: ""
|
||||||
|
}
|
||||||
}
|
}
|
||||||
];
|
];
|
||||||
|
|
||||||
|
@ -89,7 +89,20 @@ const kafka = [
|
|||||||
//Время максимального ожидания между попытками переподключения (мс)
|
//Время максимального ожидания между попытками переподключения (мс)
|
||||||
nMaxRetryTime: 20000,
|
nMaxRetryTime: 20000,
|
||||||
//Время ожидания между попытками переподключения (мс)
|
//Время ожидания между попытками переподключения (мс)
|
||||||
nInitialRetryTime: 10000
|
nInitialRetryTime: 10000,
|
||||||
|
//Использовать аутентификацию по SSL-сертификату
|
||||||
|
bAuthSSL: false,
|
||||||
|
//Параметры аутентификации по SSL-сертификату
|
||||||
|
ssl: {
|
||||||
|
//Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить)
|
||||||
|
bRejectUnauthorized: true,
|
||||||
|
//Путь к центру сертификации
|
||||||
|
sPathCa: "",
|
||||||
|
//Путь к закрытому ключу
|
||||||
|
sPathKey: "",
|
||||||
|
//Путь к сертификату
|
||||||
|
sPathCert: ""
|
||||||
|
}
|
||||||
}
|
}
|
||||||
];
|
];
|
||||||
|
|
||||||
|
@ -8,7 +8,7 @@
|
|||||||
//----------------------
|
//----------------------
|
||||||
|
|
||||||
const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции
|
const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции
|
||||||
const { Kafka, logLevel } = require("kafkajs"); //Работа с Kafka
|
const { Kafka, Partitioners, logLevel } = require("kafkajs"); //Работа с Kafka
|
||||||
|
|
||||||
//------------
|
//------------
|
||||||
// Тело модуля
|
// Тело модуля
|
||||||
@ -25,7 +25,7 @@ const publishKafka = async ({ settings, url, auth, topic, message }) => {
|
|||||||
...auth
|
...auth
|
||||||
});
|
});
|
||||||
//Инициализируем продюсера
|
//Инициализируем продюсера
|
||||||
let producer = kafka.producer();
|
let producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner });
|
||||||
//Подключаемся к Kafka
|
//Подключаемся к Kafka
|
||||||
await producer.connect();
|
await producer.connect();
|
||||||
//Отправляем сообщение
|
//Отправляем сообщение
|
||||||
@ -36,7 +36,7 @@ const publishKafka = async ({ settings, url, auth, topic, message }) => {
|
|||||||
return { statusCode: 200 };
|
return { statusCode: 200 };
|
||||||
};
|
};
|
||||||
|
|
||||||
//Получение MQTT сообщений
|
//Получение Kafka сообщений
|
||||||
const subscribeKafka = async ({ settings, service, processMessage, logger }) => {
|
const subscribeKafka = async ({ settings, service, processMessage, logger }) => {
|
||||||
try {
|
try {
|
||||||
//Признак необходимости вывода сообщения о потере соединения
|
//Признак необходимости вывода сообщения о потере соединения
|
||||||
@ -48,7 +48,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
|
|||||||
clientId: settings.sClientIdRecipient,
|
clientId: settings.sClientIdRecipient,
|
||||||
brokers: [sBroker],
|
brokers: [sBroker],
|
||||||
connectionTimeout: settings.nConnectionTimeout,
|
connectionTimeout: settings.nConnectionTimeout,
|
||||||
...getKafkaAuth(service.sSrvUser, service.sSrvPass),
|
...getKafkaAuth(service.sSrvUser, service.sSrvPass, settings),
|
||||||
logLevel: logLevel.NOTHING,
|
logLevel: logLevel.NOTHING,
|
||||||
retry: {
|
retry: {
|
||||||
retries: 0,
|
retries: 0,
|
||||||
|
@ -158,8 +158,8 @@ const appProcess = async prms => {
|
|||||||
options.url = getKafkaBroker(prms.service.sSrvRoot);
|
options.url = getKafkaBroker(prms.service.sSrvRoot);
|
||||||
options.body = prms.queue.blMsg;
|
options.body = prms.queue.blMsg;
|
||||||
options.topic = prms.function.sFnURL;
|
options.topic = prms.function.sFnURL;
|
||||||
options.auth = getKafkaAuth(prms.service.sSrvUser, prms.service.sSrvPass);
|
|
||||||
options.settings = getKafkaConnectionSettings(prms.service.sCode, prms.kafka);
|
options.settings = getKafkaConnectionSettings(prms.service.sCode, prms.kafka);
|
||||||
|
options.auth = getKafkaAuth(prms.service.sSrvUser, prms.service.sSrvPass, options.settings);
|
||||||
//Если параметры подключения не считаны
|
//Если параметры подключения не считаны
|
||||||
if (!options.settings) {
|
if (!options.settings) {
|
||||||
//Расскажем об ошибке считывания
|
//Расскажем об ошибке считывания
|
||||||
@ -321,6 +321,7 @@ const appProcess = async prms => {
|
|||||||
delete tmpOptions.body;
|
delete tmpOptions.body;
|
||||||
delete tmpOptions.cert;
|
delete tmpOptions.cert;
|
||||||
delete tmpOptions.key;
|
delete tmpOptions.key;
|
||||||
|
delete tmpOptions.auth;
|
||||||
//Конвертируем в XML
|
//Конвертируем в XML
|
||||||
let sOptions = buildOptionsXML({ options: tmpOptions });
|
let sOptions = buildOptionsXML({ options: tmpOptions });
|
||||||
//Сохраняемв БД
|
//Сохраняемв БД
|
||||||
|
@ -7,6 +7,7 @@
|
|||||||
// Подключение библиотек
|
// Подключение библиотек
|
||||||
//----------------------
|
//----------------------
|
||||||
|
|
||||||
|
const fs = require("fs"); //Работа с файлами
|
||||||
const _ = require("lodash"); //Работа с массивами и объектами
|
const _ = require("lodash"); //Работа с массивами и объектами
|
||||||
const os = require("os"); //Средства операционной системы
|
const os = require("os"); //Средства операционной системы
|
||||||
const xml2js = require("xml2js"); //Конвертация XML в JSON
|
const xml2js = require("xml2js"); //Конвертация XML в JSON
|
||||||
@ -379,7 +380,20 @@ const getKafkaBroker = sURL => {
|
|||||||
};
|
};
|
||||||
|
|
||||||
//Получение авторизации для Kafka
|
//Получение авторизации для Kafka
|
||||||
const getKafkaAuth = (sUser, sPass) => {
|
const getKafkaAuth = (sUser, sPass, kafka) => {
|
||||||
|
//Если аутентификация по SSL-сертификату
|
||||||
|
if (kafka.bAuthSSL) {
|
||||||
|
//Возвращаем авторизацию в формате SSL
|
||||||
|
return {
|
||||||
|
ssl: {
|
||||||
|
rejectUnauthorized: kafka.ssl.bRejectUnauthorized,
|
||||||
|
ca: kafka.ssl.sPathCa ? [fs.readFileSync(kafka.ssl.sPathCa, "utf-8")] : [],
|
||||||
|
key: kafka.ssl.sPathKey ? fs.readFileSync(kafka.ssl.sPathKey, "utf-8") : "",
|
||||||
|
cert: kafka.ssl.sPathCert ? fs.readFileSync(kafka.ssl.sPathCert, "utf-8") : ""
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
//Возвращаем авторизацию по пользователю, если необходимо
|
||||||
return sUser ? { ssl: true, sasl: { mechanism: "plain", username: sUser, password: sPass } } : null;
|
return sUser ? { ssl: true, sasl: { mechanism: "plain", username: sUser, password: sPass } } : null;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user