From b573b16000ff04b2c4071980ac1c7c42a74cc632 Mon Sep 17 00:00:00 2001 From: Dollerino Date: Mon, 8 Dec 2025 15:49:30 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A6=D0=98=D0=A2=D0=9A-1012=20-=20=D0=94?= =?UTF-8?q?=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD=D1=8B=20=D0=BF=D0=B0?= =?UTF-8?q?=D1=80=D0=B0=D0=BC=D0=B5=D1=82=D1=80=D1=8B=20=D0=BF=D1=80=D0=BE?= =?UTF-8?q?=D1=82=D0=BE=D0=BA=D0=BE=D0=BB=D0=B8=D1=80=D0=BE=D0=B2=D0=B0?= =?UTF-8?q?=D0=BD=D0=B8=D1=8F=20=D0=B4=D0=BB=D1=8F=20=D0=B1=D1=80=D0=BE?= =?UTF-8?q?=D0=BA=D0=B5=D1=80=D0=BE=D0=B2=20MQTT/KAFKA,=20=D1=83=D1=81?= =?UTF-8?q?=D1=82=D1=80=D0=B0=D0=BD=D0=B5=D0=BD=D0=B0=20=D0=BE=D1=88=D0=B8?= =?UTF-8?q?=D0=B1=D0=BA=D0=B0=20=D0=BF=D0=BE=D0=B4=D0=BA=D0=BB=D1=8E=D1=87?= =?UTF-8?q?=D0=B5=D0=BD=D0=B8=D1=8F=20=D1=81=20=D0=B8=D1=81=D0=BF=D0=BE?= =?UTF-8?q?=D0=BB=D1=8C=D0=B7=D1=83=D1=8E=D1=89=D0=B8=D0=BC=D1=81=D1=8F=20?= =?UTF-8?q?sGroupId?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.js | 6 ++- core/constants.js | 10 ++++ core/kafka_connector.js | 94 +++++++++++++++++++++++++++++++++---- core/mqtt_connector.js | 43 ++++++++++++++--- core/out_queue_processor.js | 6 ++- 5 files changed, 141 insertions(+), 18 deletions(-) diff --git a/config.js b/config.js index 7bd3b7a..9395326 100644 --- a/config.js +++ b/config.js @@ -94,6 +94,8 @@ const kafka = [ nMaxRetryTime: 20000, //Время ожидания между попытками переподключения (мс) nInitialRetryTime: 10000, + //Уровень протоколирования подключения (NOTHING - ничего, ERROR - ошибки, WARN - предупреждения, INFO - общая информация) + sLogLevel: "NOTHING", //Использовать аутентификацию по SSL-сертификату bAuthSSL: false, //Параметры аутентификации по SSL-сертификату @@ -122,7 +124,9 @@ const mqtt = [ //Время ожидания успешного подключения (мс) nConnectTimeout: 5000, //Время ожидания между попытками переподключения (мс) - nReconnectPeriod: 10000 + nReconnectPeriod: 10000, + //Уровень протоколирования подключения (NOTHING - ничего, ERROR - ошибки, INFO - общая информация) + sLogLevel: "NOTHING" } ]; diff --git a/core/constants.js b/core/constants.js index 421d22e..b03d566 100644 --- a/core/constants.js +++ b/core/constants.js @@ -38,6 +38,16 @@ exports.SERR_APP_SERVER_BEFORE = "ERR_APP_SERVER_BEFORE"; //Ошибка пре exports.SERR_APP_SERVER_AFTER = "ERR_APP_SERVER_AFTER"; //Ошибка постобработчика exports.SERR_DB_SERVER = "ERR_DB_SERVER"; //Ошибка обработчика сервера БД +//Типовые коды ошибок брокера сообщений Kafka +exports.SERR_KAFKA_GROUP_UNAVAILABLE = "ERR_KAFKA_GROUP_UNAVAILABLE"; //Группа получателя недоступна +exports.SERR_KAFKA = "ERR_KAFKA"; //Ошибка +exports.SWARN_KAFKA = "WARN_KAFKA"; //Предупреждение +exports.SINFO_KAFKA = "INFO_KAFKA"; //Информация + +//Типовые коды MQTT +exports.SERR_MQTT = "ERR_MQTT"; //Ошибка +exports.SINFO_MQTT = "INFO_MQTT"; //Предупреждение + //Шаблоны подсветки консольных сообщений протокола работы exports.SCONSOLE_LOG_COLOR_PATTERN_ERR = "\x1b[31m%s\x1b[0m%s"; //Цвет для ошибок exports.SCONSOLE_LOG_COLOR_PATTERN_WRN = "\x1b[33m%s\x1b[0m%s"; //Цвет для предупреждений diff --git a/core/kafka_connector.js b/core/kafka_connector.js index 1ba72a0..990b8e6 100644 --- a/core/kafka_connector.js +++ b/core/kafka_connector.js @@ -9,19 +9,82 @@ const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции const { Kafka, Partitioners, logLevel } = require("kafkajs"); //Работа с Kafka +const { SERR_KAFKA_GROUP_UNAVAILABLE, SERR_KAFKA, SWARN_KAFKA, SINFO_KAFKA } = require("./constants"); //Глобальные константы +const { ServerError } = require("./server_errors"); //Типовая ошибка + +//---------- +// Константы +//---------- + +//Общие константы работы Kafka +const NCHECK_GROUP_RETRIES = 1; //Количество попыток подключения для проверки доступности группы + +//-------------------------- +// Вспомогательные функции +//-------------------------- + +//Проверка доступности группы +const checkGroupAvailable = async (clientProps, groupId) => { + //Иницализируем подключение к Kafka + let client = new Kafka({ + ...clientProps, + retry: { retries: NCHECK_GROUP_RETRIES } + }); + //Инициализируем доступ к командам + const admin = client.admin(); + //Подключаемся + await admin.connect(); + //Считываем информацию о группе + const groupInfo = await admin.describeGroups([groupId]); + //Отключаемся + await admin.disconnect(); + //Если в данной группе есть участники + if (groupInfo?.groups[0]?.members && groupInfo.groups[0].members.length !== 0) { + //Сообщаем о невозможности запустить сервис + throw new ServerError(SERR_KAFKA_GROUP_UNAVAILABLE, `${SERR_KAFKA}: Группа получателя "${groupId}" активна.`); + } +}; + +//Логгер для вывода внутренних сообщений Kafka в общий поток +const KafkaLogger = selfLogger => { + return level => { + return async ({ log }) => { + //Считываем текст сообщения и доп. информацию + const { message, ...logFullInfo } = log; + //Убираем лишнюю информацию из доп. информации + const { stack, timestamp, logger, ...logInfo } = logFullInfo; + //Исходим от уровня ошибки + switch (level) { + //Ошибка + case logLevel.ERROR: + await selfLogger.error(`${SERR_KAFKA}: ${message} ${JSON.stringify(logInfo)}`); + break; + //Предупреждение + case logLevel.WARN: + await selfLogger.warn(`${SWARN_KAFKA}: ${message} ${JSON.stringify(logInfo)}`); + break; + //Информация + case logLevel.INFO: + await selfLogger.info(`${SINFO_KAFKA}: ${message} ${JSON.stringify(logInfo)}`); + break; + } + }; + }; +}; //------------ // Тело модуля //------------ //Отправка сообщения Kafka -const publishKafka = async ({ settings, url, auth, topic, message }) => { +const publishKafka = async ({ settings, url, auth, topic, message, logger }) => { //Иницализируем подключение к Kafka let kafka = new Kafka({ clientId: settings.sClientIdSender, brokers: [url], connectionTimeout: settings.nConnectionTimeout, - logLevel: logLevel.NOTHING, + logLevel: logLevel[settings.sLogLevel] || logLevel.NOTHING, + logCreator: KafkaLogger(logger), ...auth }); //Инициализируем продюсера @@ -43,13 +106,20 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => let bLogLostConnection = true; //Получаем брокера по URL сервиса let sBroker = getKafkaBroker(service.sSrvRoot); - //Иницализируем подключение к Kafka - let client = new Kafka({ + //Формируем свойства подключения к Kafka + let clientProps = { clientId: settings.sClientIdRecipient, brokers: [sBroker], connectionTimeout: settings.nConnectionTimeout, ...getKafkaAuth(service.sSrvUser, service.sSrvPass, settings), - logLevel: logLevel.NOTHING, + logLevel: logLevel[settings.sLogLevel] || logLevel.NOTHING, + logCreator: KafkaLogger(logger) + }; + //Проверка доступности группы + await checkGroupAvailable(clientProps, settings.sGroupId); + //Иницализируем подключение к Kafka + let client = new Kafka({ + ...clientProps, retry: { retries: 0, maxRetryTime: settings.nMaxRetryTime, @@ -59,7 +129,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => //Если требуется вывести ошибку if (bLogLostConnection) { //Выводим ошибку - logger.error(`Соединение с Kafka потеряно (${sBroker}): ${makeErrorText(error)}`); + logger.error(`${SERR_KAFKA}: Соединение потеряно (${sBroker}): ${makeErrorText(error)}`); //Сбрасываем признак необходимости вывода ошибки bLogLostConnection = false; } @@ -90,7 +160,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => }) }); } catch (e) { - await logger.error(`Ошибка обработки входящего сообщения Kafka: ${makeErrorText(e)}`); + await logger.error(`${SERR_KAFKA}: Ошибка обработки входящего сообщения: ${makeErrorText(e)}`); } } }); @@ -99,7 +169,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => //Если сообщение о потере соединения уже выводилось if (!bLogLostConnection) { //Сообщим о восстановлении соединения - logger.info(`Соединение с Kafka восстановлено (${sBroker})`); + logger.info(`${SINFO_KAFKA}: Соединение восстановлено (${sBroker})`); //Устанавливаем признак сообщения о потере соединения bLogLostConnection = true; } @@ -107,7 +177,13 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => //Возвращаем соединение return consumer; } catch (e) { - await logger.error(`Ошибка запуска обработчика очереди входящих сообщений Kafka: ${makeErrorText(e)}`); + //Если это фатальная ошибка - выдаем её + if (e.sCode === SERR_KAFKA_GROUP_UNAVAILABLE) { + throw new ServerError(e.sCode, e.sMessage); + } else { + //Если ошибка не фатальная - выводим информацию + await logger.error(`${SERR_KAFKA}: Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`); + } } }; diff --git a/core/mqtt_connector.js b/core/mqtt_connector.js index e0af9e1..d0292aa 100644 --- a/core/mqtt_connector.js +++ b/core/mqtt_connector.js @@ -9,13 +9,22 @@ const { makeErrorText } = require("./utils"); //Вспомогательные функции const mqtt = require("mqtt"); //Работа с MQTT +const { SERR_MQTT, SINFO_MQTT } = require("./constants"); //Глобальные константы + +//---------- +// Константы +//---------- + +//Общие константы работы MQTT +const SLOG_ERROR = "ERROR"; //Уровень протоколирования - ошибки +const SLOG_INFO = "INFO"; //Уровень протоколирования - информация //------------ // Тело модуля //------------ //Отправка MQTT сообщения -const publishMQTT = async ({ settings, url, auth, topic, message }) => { +const publishMQTT = async ({ settings, url, auth, topic, message, logger }) => { //Инициализируем подключение const client = await mqtt.connectAsync(url, { clientId: settings.sClientIdSender, @@ -25,6 +34,14 @@ const publishMQTT = async ({ settings, url, auth, topic, message }) => { password: auth.pass, reconnectPeriod: settings.nReconnectPeriod }); + //Прослушиваем ошибки + client.on("error", e => { + //Если требуется выдавать ошибку + if (settings.sLogLevel === SLOG_ERROR) { + //Выводим ошибку + logger.error(`${SERR_MQTT}: ${makeErrorText(e)}`); + } + }); //Отправляем сообщение await client.publishAsync(topic, message); //Закрываем подключение @@ -64,18 +81,32 @@ const subscribeMQTT = async ({ settings, service, processMessage, logger }) => { }); //Прослушиваем отключение от сервера client.on("offline", () => { - //Выводим ошибку - logger.error(`Соединение с MQTT потеряно (${sBroker})`); + //Если требуется выдавать ошибку + if (settings.sLogLevel === SLOG_ERROR) { + //Выводим ошибку + logger.error(`${SERR_MQTT}: Соединение потеряно (${sBroker})`); + } }); //Прослушиваем восстановление соединения client.on("connect", () => { - //Сообщим о восстановлении соединения - logger.info(`Соединение с MQTT восстановлено (${sBroker})`); + //Если требуется выдавать предупреждение + if ([SLOG_ERROR, SLOG_INFO].includes(settings.sLogLevel)) { + //Сообщим о восстановлении соединения + logger.info(`${SINFO_MQTT}: Соединение восстановлено (${sBroker})`); + } + }); + //Прослушиваем ошибки + client.on("error", e => { + //Если требуется выдавать ошибку + if (settings.sLogLevel === SLOG_ERROR) { + //Выводим ошибку + logger.error(`${SERR_MQTT}: ${makeErrorText(e)}`); + } }); //Возвращаем подключение return client; } catch (e) { - logger.error(`Ошибка запуска обработчика очереди исходящих сообщений MQTT: ${makeErrorText(e)}`); + logger.error(`${SERR_MQTT}: Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`); } }; diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 3583925..4d3f895 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -343,7 +343,8 @@ const appProcess = async prms => { url: options.url, auth: options.auth, topic: options.topic, - message: options.body + message: options.body, + logger }); break; //MQTT/MQTTS @@ -353,7 +354,8 @@ const appProcess = async prms => { url: options.url, auth: options.auth, topic: options.topic, - message: options.body + message: options.body, + logger }); break; //HTTP/HTTPS