ЦИТК-1012 - Добавлены параметры протоколирования для брокеров MQTT/KAFKA, устранена ошибка подключения с использующимся sGroupId #12

Open
Dollerok wants to merge 1 commits from Dollerok/P8-ExchangeService:master into master
5 changed files with 141 additions and 18 deletions

View File

@ -94,6 +94,8 @@ const kafka = [
nMaxRetryTime: 20000,
//Время ожидания между попытками переподключения (мс)
nInitialRetryTime: 10000,
//Уровень протоколирования подключения (NOTHING - ничего, ERROR - ошибки, WARN - предупреждения, INFO - общая информация)
sLogLevel: "NOTHING",
//Использовать аутентификацию по SSL-сертификату
bAuthSSL: false,
//Параметры аутентификации по SSL-сертификату
@ -122,7 +124,9 @@ const mqtt = [
//Время ожидания успешного подключения (мс)
nConnectTimeout: 5000,
//Время ожидания между попытками переподключения (мс)
nReconnectPeriod: 10000
nReconnectPeriod: 10000,
//Уровень протоколирования подключения (NOTHING - ничего, ERROR - ошибки, INFO - общая информация)
sLogLevel: "NOTHING"
}
];

View File

@ -38,6 +38,16 @@ exports.SERR_APP_SERVER_BEFORE = "ERR_APP_SERVER_BEFORE"; //Ошибка пре
exports.SERR_APP_SERVER_AFTER = "ERR_APP_SERVER_AFTER"; //Ошибка постобработчика
exports.SERR_DB_SERVER = "ERR_DB_SERVER"; //Ошибка обработчика сервера БД
//Типовые коды ошибок брокера сообщений Kafka
exports.SERR_KAFKA_GROUP_UNAVAILABLE = "ERR_KAFKA_GROUP_UNAVAILABLE"; //Группа получателя недоступна
exports.SERR_KAFKA = "ERR_KAFKA"; //Ошибка
exports.SWARN_KAFKA = "WARN_KAFKA"; //Предупреждение
exports.SINFO_KAFKA = "INFO_KAFKA"; //Информация
//Типовые коды MQTT
exports.SERR_MQTT = "ERR_MQTT"; //Ошибка
exports.SINFO_MQTT = "INFO_MQTT"; //Предупреждение
//Шаблоны подсветки консольных сообщений протокола работы
exports.SCONSOLE_LOG_COLOR_PATTERN_ERR = "\x1b[31m%s\x1b[0m%s"; //Цвет для ошибок
exports.SCONSOLE_LOG_COLOR_PATTERN_WRN = "\x1b[33m%s\x1b[0m%s"; //Цвет для предупреждений

View File

@ -9,19 +9,82 @@
const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции
const { Kafka, Partitioners, logLevel } = require("kafkajs"); //Работа с Kafka
const { SERR_KAFKA_GROUP_UNAVAILABLE, SERR_KAFKA, SWARN_KAFKA, SINFO_KAFKA } = require("./constants"); //Глобальные константы
const { ServerError } = require("./server_errors"); //Типовая ошибка
//----------
// Константы
//----------
//Общие константы работы Kafka
const NCHECK_GROUP_RETRIES = 1; //Количество попыток подключения для проверки доступности группы
//--------------------------
// Вспомогательные функции
//--------------------------
//Проверка доступности группы
const checkGroupAvailable = async (clientProps, groupId) => {
//Иницализируем подключение к Kafka
let client = new Kafka({
...clientProps,
retry: { retries: NCHECK_GROUP_RETRIES }
});
//Инициализируем доступ к командам
const admin = client.admin();
//Подключаемся
await admin.connect();
//Считываем информацию о группе
const groupInfo = await admin.describeGroups([groupId]);
//Отключаемся
await admin.disconnect();
//Если в данной группе есть участники
if (groupInfo?.groups[0]?.members && groupInfo.groups[0].members.length !== 0) {
//Сообщаем о невозможности запустить сервис
throw new ServerError(SERR_KAFKA_GROUP_UNAVAILABLE, `${SERR_KAFKA}: Группа получателя "${groupId}" активна.`);
}
};
//Логгер для вывода внутренних сообщений Kafka в общий поток
const KafkaLogger = selfLogger => {
return level => {
return async ({ log }) => {
//Считываем текст сообщения и доп. информацию
const { message, ...logFullInfo } = log;
//Убираем лишнюю информацию из доп. информации
const { stack, timestamp, logger, ...logInfo } = logFullInfo;
//Исходим от уровня ошибки
switch (level) {
//Ошибка
case logLevel.ERROR:
await selfLogger.error(`${SERR_KAFKA}: ${message} ${JSON.stringify(logInfo)}`);
break;
//Предупреждение
case logLevel.WARN:
await selfLogger.warn(`${SWARN_KAFKA}: ${message} ${JSON.stringify(logInfo)}`);
break;
//Информация
case logLevel.INFO:
await selfLogger.info(`${SINFO_KAFKA}: ${message} ${JSON.stringify(logInfo)}`);
break;
}
};
};
};
//------------
// Тело модуля
//------------
//Отправка сообщения Kafka
const publishKafka = async ({ settings, url, auth, topic, message }) => {
const publishKafka = async ({ settings, url, auth, topic, message, logger }) => {
//Иницализируем подключение к Kafka
let kafka = new Kafka({
clientId: settings.sClientIdSender,
brokers: [url],
connectionTimeout: settings.nConnectionTimeout,
logLevel: logLevel.NOTHING,
logLevel: logLevel[settings.sLogLevel] || logLevel.NOTHING,
logCreator: KafkaLogger(logger),
...auth
});
//Инициализируем продюсера
@ -43,13 +106,20 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
let bLogLostConnection = true;
//Получаем брокера по URL сервиса
let sBroker = getKafkaBroker(service.sSrvRoot);
//Иницализируем подключение к Kafka
let client = new Kafka({
//Формируем свойства подключения к Kafka
let clientProps = {
clientId: settings.sClientIdRecipient,
brokers: [sBroker],
connectionTimeout: settings.nConnectionTimeout,
...getKafkaAuth(service.sSrvUser, service.sSrvPass, settings),
logLevel: logLevel.NOTHING,
logLevel: logLevel[settings.sLogLevel] || logLevel.NOTHING,
logCreator: KafkaLogger(logger)
};
//Проверка доступности группы
await checkGroupAvailable(clientProps, settings.sGroupId);
//Иницализируем подключение к Kafka
let client = new Kafka({
...clientProps,
retry: {
retries: 0,
maxRetryTime: settings.nMaxRetryTime,
@ -59,7 +129,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
//Если требуется вывести ошибку
if (bLogLostConnection) {
//Выводим ошибку
logger.error(`Соединение с Kafka потеряно (${sBroker}): ${makeErrorText(error)}`);
logger.error(`${SERR_KAFKA}: Соединение потеряно (${sBroker}): ${makeErrorText(error)}`);
//Сбрасываем признак необходимости вывода ошибки
bLogLostConnection = false;
}
@ -90,7 +160,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
})
});
} catch (e) {
await logger.error(`Ошибка обработки входящего сообщения Kafka: ${makeErrorText(e)}`);
await logger.error(`${SERR_KAFKA}: Ошибка обработки входящего сообщения: ${makeErrorText(e)}`);
}
}
});
@ -99,7 +169,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
//Если сообщение о потере соединения уже выводилось
if (!bLogLostConnection) {
//Сообщим о восстановлении соединения
logger.info(`Соединение с Kafka восстановлено (${sBroker})`);
logger.info(`${SINFO_KAFKA}: Соединение восстановлено (${sBroker})`);
//Устанавливаем признак сообщения о потере соединения
bLogLostConnection = true;
}
@ -107,7 +177,13 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
//Возвращаем соединение
return consumer;
} catch (e) {
await logger.error(`Ошибка запуска обработчика очереди входящих сообщений Kafka: ${makeErrorText(e)}`);
//Если это фатальная ошибка - выдаем её
if (e.sCode === SERR_KAFKA_GROUP_UNAVAILABLE) {
throw new ServerError(e.sCode, e.sMessage);
} else {
//Если ошибка не фатальная - выводим информацию
await logger.error(`${SERR_KAFKA}: Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`);
}
}
};

View File

@ -9,13 +9,22 @@
const { makeErrorText } = require("./utils"); //Вспомогательные функции
const mqtt = require("mqtt"); //Работа с MQTT
const { SERR_MQTT, SINFO_MQTT } = require("./constants"); //Глобальные константы
//----------
// Константы
//----------
//Общие константы работы MQTT
const SLOG_ERROR = "ERROR"; //Уровень протоколирования - ошибки
const SLOG_INFO = "INFO"; //Уровень протоколирования - информация
//------------
// Тело модуля
//------------
//Отправка MQTT сообщения
const publishMQTT = async ({ settings, url, auth, topic, message }) => {
const publishMQTT = async ({ settings, url, auth, topic, message, logger }) => {
//Инициализируем подключение
const client = await mqtt.connectAsync(url, {
clientId: settings.sClientIdSender,
@ -25,6 +34,14 @@ const publishMQTT = async ({ settings, url, auth, topic, message }) => {
password: auth.pass,
reconnectPeriod: settings.nReconnectPeriod
});
//Прослушиваем ошибки
client.on("error", e => {
//Если требуется выдавать ошибку
if (settings.sLogLevel === SLOG_ERROR) {
//Выводим ошибку
logger.error(`${SERR_MQTT}: ${makeErrorText(e)}`);
}
});
//Отправляем сообщение
await client.publishAsync(topic, message);
//Закрываем подключение
@ -64,18 +81,32 @@ const subscribeMQTT = async ({ settings, service, processMessage, logger }) => {
});
//Прослушиваем отключение от сервера
client.on("offline", () => {
//Выводим ошибку
logger.error(`Соединение с MQTT потеряно (${sBroker})`);
//Если требуется выдавать ошибку
if (settings.sLogLevel === SLOG_ERROR) {
//Выводим ошибку
logger.error(`${SERR_MQTT}: Соединение потеряно (${sBroker})`);
}
});
//Прослушиваем восстановление соединения
client.on("connect", () => {
//Сообщим о восстановлении соединения
logger.info(`Соединение с MQTT восстановлено (${sBroker})`);
//Если требуется выдавать предупреждение
if ([SLOG_ERROR, SLOG_INFO].includes(settings.sLogLevel)) {
//Сообщим о восстановлении соединения
logger.info(`${SINFO_MQTT}: Соединение восстановлено (${sBroker})`);
}
});
//Прослушиваем ошибки
client.on("error", e => {
//Если требуется выдавать ошибку
if (settings.sLogLevel === SLOG_ERROR) {
//Выводим ошибку
logger.error(`${SERR_MQTT}: ${makeErrorText(e)}`);
}
});
//Возвращаем подключение
return client;
} catch (e) {
logger.error(`Ошибка запуска обработчика очереди исходящих сообщений MQTT: ${makeErrorText(e)}`);
logger.error(`${SERR_MQTT}: Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`);
}
};

View File

@ -343,7 +343,8 @@ const appProcess = async prms => {
url: options.url,
auth: options.auth,
topic: options.topic,
message: options.body
message: options.body,
logger
});
break;
//MQTT/MQTTS
@ -353,7 +354,8 @@ const appProcess = async prms => {
url: options.url,
auth: options.auth,
topic: options.topic,
message: options.body
message: options.body,
logger
});
break;
//HTTP/HTTPS