120 lines
5.1 KiB
JavaScript
120 lines
5.1 KiB
JavaScript
/*
|
||
Сервис интеграции ПП Парус 8 с WEB API
|
||
Модуль ядра: обработчик kafka сообщений
|
||
*/
|
||
|
||
//----------------------
|
||
// Подключение библиотек
|
||
//----------------------
|
||
|
||
const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции
|
||
const { Kafka, logLevel } = require("kafkajs"); //Работа с Kafka
|
||
|
||
//------------
|
||
// Тело модуля
|
||
//------------
|
||
|
||
//Отправка сообщения Kafka
|
||
const publishKafka = async ({ settings, url, auth, topic, message }) => {
|
||
//Иницализируем подключение к Kafka
|
||
let kafka = new Kafka({
|
||
clientId: settings.sClientIdSender,
|
||
brokers: [url],
|
||
connectionTimeout: settings.nConnectionTimeout,
|
||
logLevel: logLevel.NOTHING,
|
||
...auth
|
||
});
|
||
//Инициализируем продюсера
|
||
let producer = kafka.producer();
|
||
//Подключаемся к Kafka
|
||
await producer.connect();
|
||
//Отправляем сообщение
|
||
await producer.send({ topic: topic, messages: [{ value: message }] });
|
||
//Отключаемся
|
||
await producer.disconnect();
|
||
//Возвращаем статус успешной отправки
|
||
return { statusCode: 200 };
|
||
};
|
||
|
||
//Получение MQTT сообщений
|
||
const subscribeKafka = async ({ settings, service, processMessage, logger }) => {
|
||
try {
|
||
//Признак необходимости вывода сообщения о потере соединения
|
||
let bLogLostConnection = true;
|
||
//Получаем брокера по URL сервиса
|
||
let sBroker = getKafkaBroker(service.sSrvRoot);
|
||
//Иницализируем подключение к Kafka
|
||
let client = new Kafka({
|
||
clientId: settings.sClientIdRecipient,
|
||
brokers: [sBroker],
|
||
connectionTimeout: settings.nConnectionTimeout,
|
||
...getKafkaAuth(service.sSrvUser, service.sSrvPass),
|
||
logLevel: logLevel.NOTHING,
|
||
retry: {
|
||
retries: 0,
|
||
maxRetryTime: settings.nMaxRetryTime,
|
||
initialRetryTime: settings.nInitialRetryTime,
|
||
restartOnFailure: error => {
|
||
return new Promise(resolve => {
|
||
//Если требуется вывести ошибку
|
||
if (bLogLostConnection) {
|
||
//Выводим ошибку
|
||
logger.error(`Соединение с Kafka потеряно (${sBroker}): ${makeErrorText(error)}`);
|
||
//Сбрасываем признак необходимости вывода ошибки
|
||
bLogLostConnection = false;
|
||
}
|
||
resolve(settings.bRestartOnFailure);
|
||
});
|
||
}
|
||
}
|
||
});
|
||
//Инициализируем получателя
|
||
let consumer = client.consumer({ groupId: settings.sGroupId });
|
||
//Устанавливаем прослушивание
|
||
await consumer.connect();
|
||
consumer.subscribe({
|
||
topics: service.functions.map(fn => {
|
||
return fn.sFnURL;
|
||
})
|
||
});
|
||
//Запускаем прослушивание необходимых топиков
|
||
consumer.run({
|
||
eachMessage: async ({ topic, message }) => {
|
||
try {
|
||
//Вызываем обработчик
|
||
processMessage({
|
||
message,
|
||
service,
|
||
fn: service.functions.find(fn => {
|
||
return fn.sFnURL === topic;
|
||
})
|
||
});
|
||
} catch (e) {
|
||
await logger.error(`Ошибка обработки исходящего сообщения Kafka: ${makeErrorText(e)}`);
|
||
}
|
||
}
|
||
});
|
||
//Отслеживаем соединение
|
||
consumer.on(consumer.events.CONNECT, () => {
|
||
//Если сообщение о потере соединения уже выводилось
|
||
if (!bLogLostConnection) {
|
||
//Сообщим о восстановлении соединения
|
||
logger.info(`Соединение с Kafka восстановлено (${sBroker})`);
|
||
//Устанавливаем признак сообщения о потере соединения
|
||
bLogLostConnection = true;
|
||
}
|
||
});
|
||
//Возвращаем соединение
|
||
return consumer;
|
||
} catch (e) {
|
||
await logger.error(`Ошибка запуска обработчика очереди исходящих сообщений Kafka: ${makeErrorText(e)}`);
|
||
}
|
||
};
|
||
|
||
//-----------------
|
||
// Интерфейс модуля
|
||
//-----------------
|
||
|
||
exports.publishKafka = publishKafka;
|
||
exports.subscribeKafka = subscribeKafka;
|