/* Сервис интеграции ПП Парус 8 с WEB API Модуль ядра: обработчик kafka сообщений */ //---------------------- // Подключение библиотек //---------------------- const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции const { Kafka, Partitioners, logLevel } = require("kafkajs"); //Работа с Kafka const { SERR_KAFKA_GROUP_UNAVAILABLE, SERR_KAFKA, SWARN_KAFKA, SINFO_KAFKA } = require("./constants"); //Глобальные константы const { ServerError } = require("./server_errors"); //Типовая ошибка //---------- // Константы //---------- //Общие константы работы Kafka const NCHECK_GROUP_RETRIES = 1; //Количество попыток подключения для проверки доступности группы //-------------------------- // Вспомогательные функции //-------------------------- //Проверка доступности группы const checkGroupAvailable = async (clientProps, groupId) => { //Иницализируем подключение к Kafka let client = new Kafka({ ...clientProps, retry: { retries: NCHECK_GROUP_RETRIES } }); //Инициализируем доступ к командам const admin = client.admin(); //Подключаемся await admin.connect(); //Считываем информацию о группе const groupInfo = await admin.describeGroups([groupId]); //Отключаемся await admin.disconnect(); //Если в данной группе есть участники if (groupInfo?.groups[0]?.members && groupInfo.groups[0].members.length !== 0) { //Сообщаем о невозможности запустить сервис throw new ServerError(SERR_KAFKA_GROUP_UNAVAILABLE, `${SERR_KAFKA}: Группа получателя "${groupId}" активна.`); } }; //Логгер для вывода внутренних сообщений Kafka в общий поток const KafkaLogger = selfLogger => { return level => { return async ({ log }) => { //Считываем текст сообщения и доп. информацию const { message, ...logFullInfo } = log; //Убираем лишнюю информацию из доп. информации const { stack, timestamp, logger, ...logInfo } = logFullInfo; //Исходим от уровня ошибки switch (level) { //Ошибка case logLevel.ERROR: await selfLogger.error(`${SERR_KAFKA}: ${message} ${JSON.stringify(logInfo)}`); break; //Предупреждение case logLevel.WARN: await selfLogger.warn(`${SWARN_KAFKA}: ${message} ${JSON.stringify(logInfo)}`); break; //Информация case logLevel.INFO: await selfLogger.info(`${SINFO_KAFKA}: ${message} ${JSON.stringify(logInfo)}`); break; } }; }; }; //------------ // Тело модуля //------------ //Отправка сообщения Kafka const publishKafka = async ({ settings, url, auth, topic, message, logger }) => { //Иницализируем подключение к Kafka let kafka = new Kafka({ clientId: settings.sClientIdSender, brokers: [url], connectionTimeout: settings.nConnectionTimeout, logLevel: logLevel[settings.sLogLevel] || logLevel.NOTHING, logCreator: KafkaLogger(logger), ...auth }); //Инициализируем продюсера let producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner }); //Подключаемся к Kafka await producer.connect(); //Отправляем сообщение await producer.send({ topic: topic, messages: [{ value: message }] }); //Отключаемся await producer.disconnect(); //Возвращаем статус успешной отправки return { statusCode: 200 }; }; //Получение Kafka сообщений const subscribeKafka = async ({ settings, service, processMessage, logger }) => { try { //Признак необходимости вывода сообщения о потере соединения let bLogLostConnection = true; //Получаем брокера по URL сервиса let sBroker = getKafkaBroker(service.sSrvRoot); //Формируем свойства подключения к Kafka let clientProps = { clientId: settings.sClientIdRecipient, brokers: [sBroker], connectionTimeout: settings.nConnectionTimeout, ...getKafkaAuth(service.sSrvUser, service.sSrvPass, settings), logLevel: logLevel[settings.sLogLevel] || logLevel.NOTHING, logCreator: KafkaLogger(logger) }; //Проверка доступности группы await checkGroupAvailable(clientProps, settings.sGroupId); //Иницализируем подключение к Kafka let client = new Kafka({ ...clientProps, retry: { retries: 0, maxRetryTime: settings.nMaxRetryTime, initialRetryTime: settings.nInitialRetryTime, restartOnFailure: error => { return new Promise(resolve => { //Если требуется вывести ошибку if (bLogLostConnection) { //Выводим ошибку logger.error(`${SERR_KAFKA}: Соединение потеряно (${sBroker}): ${makeErrorText(error)}`); //Сбрасываем признак необходимости вывода ошибки bLogLostConnection = false; } resolve(settings.bRestartOnFailure); }); } } }); //Инициализируем получателя let consumer = client.consumer({ groupId: settings.sGroupId }); //Устанавливаем прослушивание await consumer.connect(); consumer.subscribe({ topics: service.functions.map(fn => { return fn.sFnURL; }) }); //Запускаем прослушивание необходимых топиков consumer.run({ eachMessage: async ({ topic, message }) => { try { //Вызываем обработчик processMessage({ message, service, fn: service.functions.find(fn => { return fn.sFnURL === topic; }) }); } catch (e) { await logger.error(`${SERR_KAFKA}: Ошибка обработки входящего сообщения: ${makeErrorText(e)}`); } } }); //Отслеживаем соединение consumer.on(consumer.events.CONNECT, () => { //Если сообщение о потере соединения уже выводилось if (!bLogLostConnection) { //Сообщим о восстановлении соединения logger.info(`${SINFO_KAFKA}: Соединение восстановлено (${sBroker})`); //Устанавливаем признак сообщения о потере соединения bLogLostConnection = true; } }); //Возвращаем соединение return consumer; } catch (e) { //Если это фатальная ошибка - выдаем её if (e.sCode === SERR_KAFKA_GROUP_UNAVAILABLE) { throw new ServerError(e.sCode, e.sMessage); } else { //Если ошибка не фатальная - выводим информацию await logger.error(`${SERR_KAFKA}: Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`); } } }; //----------------- // Интерфейс модуля //----------------- exports.publishKafka = publishKafka; exports.subscribeKafka = subscribeKafka;