P8-ExchangeService/core/kafka_connector.js

120 lines
5.1 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
Сервис интеграции ПП Парус 8 с WEB API
Модуль ядра: обработчик kafka сообщений
*/
//----------------------
// Подключение библиотек
//----------------------
const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции
const { Kafka, Partitioners, logLevel } = require("kafkajs"); //Работа с Kafka
//------------
// Тело модуля
//------------
//Отправка сообщения Kafka
const publishKafka = async ({ settings, url, auth, topic, message }) => {
//Иницализируем подключение к Kafka
let kafka = new Kafka({
clientId: settings.sClientIdSender,
brokers: [url],
connectionTimeout: settings.nConnectionTimeout,
logLevel: logLevel.NOTHING,
...auth
});
//Инициализируем продюсера
let producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner });
//Подключаемся к Kafka
await producer.connect();
//Отправляем сообщение
await producer.send({ topic: topic, messages: [{ value: message }] });
//Отключаемся
await producer.disconnect();
//Возвращаем статус успешной отправки
return { statusCode: 200 };
};
//Получение Kafka сообщений
const subscribeKafka = async ({ settings, service, processMessage, logger }) => {
try {
//Признак необходимости вывода сообщения о потере соединения
let bLogLostConnection = true;
//Получаем брокера по URL сервиса
let sBroker = getKafkaBroker(service.sSrvRoot);
//Иницализируем подключение к Kafka
let client = new Kafka({
clientId: settings.sClientIdRecipient,
brokers: [sBroker],
connectionTimeout: settings.nConnectionTimeout,
...getKafkaAuth(service.sSrvUser, service.sSrvPass, settings),
logLevel: logLevel.NOTHING,
retry: {
retries: 0,
maxRetryTime: settings.nMaxRetryTime,
initialRetryTime: settings.nInitialRetryTime,
restartOnFailure: error => {
return new Promise(resolve => {
//Если требуется вывести ошибку
if (bLogLostConnection) {
//Выводим ошибку
logger.error(`Соединение с Kafka потеряно (${sBroker}): ${makeErrorText(error)}`);
//Сбрасываем признак необходимости вывода ошибки
bLogLostConnection = false;
}
resolve(settings.bRestartOnFailure);
});
}
}
});
//Инициализируем получателя
let consumer = client.consumer({ groupId: settings.sGroupId });
//Устанавливаем прослушивание
await consumer.connect();
consumer.subscribe({
topics: service.functions.map(fn => {
return fn.sFnURL;
})
});
//Запускаем прослушивание необходимых топиков
consumer.run({
eachMessage: async ({ topic, message }) => {
try {
//Вызываем обработчик
processMessage({
message,
service,
fn: service.functions.find(fn => {
return fn.sFnURL === topic;
})
});
} catch (e) {
await logger.error(`Ошибка обработки исходящего сообщения Kafka: ${makeErrorText(e)}`);
}
}
});
//Отслеживаем соединение
consumer.on(consumer.events.CONNECT, () => {
//Если сообщение о потере соединения уже выводилось
if (!bLogLostConnection) {
//Сообщим о восстановлении соединения
logger.info(`Соединение с Kafka восстановлено (${sBroker})`);
//Устанавливаем признак сообщения о потере соединения
bLogLostConnection = true;
}
});
//Возвращаем соединение
return consumer;
} catch (e) {
await logger.error(`Ошибка запуска обработчика очереди исходящих сообщений Kafka: ${makeErrorText(e)}`);
}
};
//-----------------
// Интерфейс модуля
//-----------------
exports.publishKafka = publishKafka;
exports.subscribeKafka = subscribeKafka;