/* Сервис интеграции ПП Парус 8 с WEB API Модуль ядра: обработчик kafka сообщений */ //---------------------- // Подключение библиотек //---------------------- const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции const { Kafka, Partitioners, logLevel } = require("kafkajs"); //Работа с Kafka //------------ // Тело модуля //------------ //Отправка сообщения Kafka const publishKafka = async ({ settings, url, auth, topic, message }) => { //Иницализируем подключение к Kafka let kafka = new Kafka({ clientId: settings.sClientIdSender, brokers: [url], connectionTimeout: settings.nConnectionTimeout, logLevel: logLevel.NOTHING, ...auth }); //Инициализируем продюсера let producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner }); //Подключаемся к Kafka await producer.connect(); //Отправляем сообщение await producer.send({ topic: topic, messages: [{ value: message }] }); //Отключаемся await producer.disconnect(); //Возвращаем статус успешной отправки return { statusCode: 200 }; }; //Получение Kafka сообщений const subscribeKafka = async ({ settings, service, processMessage, logger }) => { try { //Признак необходимости вывода сообщения о потере соединения let bLogLostConnection = true; //Получаем брокера по URL сервиса let sBroker = getKafkaBroker(service.sSrvRoot); //Иницализируем подключение к Kafka let client = new Kafka({ clientId: settings.sClientIdRecipient, brokers: [sBroker], connectionTimeout: settings.nConnectionTimeout, ...getKafkaAuth(service.sSrvUser, service.sSrvPass, settings), logLevel: logLevel.NOTHING, retry: { retries: 0, maxRetryTime: settings.nMaxRetryTime, initialRetryTime: settings.nInitialRetryTime, restartOnFailure: error => { return new Promise(resolve => { //Если требуется вывести ошибку if (bLogLostConnection) { //Выводим ошибку logger.error(`Соединение с Kafka потеряно (${sBroker}): ${makeErrorText(error)}`); //Сбрасываем признак необходимости вывода ошибки bLogLostConnection = false; } resolve(settings.bRestartOnFailure); }); } } }); //Инициализируем получателя let consumer = client.consumer({ groupId: settings.sGroupId }); //Устанавливаем прослушивание await consumer.connect(); consumer.subscribe({ topics: service.functions.map(fn => { return fn.sFnURL; }) }); //Запускаем прослушивание необходимых топиков consumer.run({ eachMessage: async ({ topic, message }) => { try { //Вызываем обработчик processMessage({ message, service, fn: service.functions.find(fn => { return fn.sFnURL === topic; }) }); } catch (e) { await logger.error(`Ошибка обработки исходящего сообщения Kafka: ${makeErrorText(e)}`); } } }); //Отслеживаем соединение consumer.on(consumer.events.CONNECT, () => { //Если сообщение о потере соединения уже выводилось if (!bLogLostConnection) { //Сообщим о восстановлении соединения logger.info(`Соединение с Kafka восстановлено (${sBroker})`); //Устанавливаем признак сообщения о потере соединения bLogLostConnection = true; } }); //Возвращаем соединение return consumer; } catch (e) { await logger.error(`Ошибка запуска обработчика очереди исходящих сообщений Kafka: ${makeErrorText(e)}`); } }; //----------------- // Интерфейс модуля //----------------- exports.publishKafka = publishKafka; exports.subscribeKafka = subscribeKafka;