diff --git a/config.js b/config.js index 7bd3b7a..9395326 100644 --- a/config.js +++ b/config.js @@ -94,6 +94,8 @@ const kafka = [ nMaxRetryTime: 20000, //Время ожидания между попытками переподключения (мс) nInitialRetryTime: 10000, + //Уровень протоколирования подключения (NOTHING - ничего, ERROR - ошибки, WARN - предупреждения, INFO - общая информация) + sLogLevel: "NOTHING", //Использовать аутентификацию по SSL-сертификату bAuthSSL: false, //Параметры аутентификации по SSL-сертификату @@ -122,7 +124,9 @@ const mqtt = [ //Время ожидания успешного подключения (мс) nConnectTimeout: 5000, //Время ожидания между попытками переподключения (мс) - nReconnectPeriod: 10000 + nReconnectPeriod: 10000, + //Уровень протоколирования подключения (NOTHING - ничего, ERROR - ошибки, INFO - общая информация) + sLogLevel: "NOTHING" } ]; diff --git a/core/constants.js b/core/constants.js index 421d22e..b03d566 100644 --- a/core/constants.js +++ b/core/constants.js @@ -38,6 +38,16 @@ exports.SERR_APP_SERVER_BEFORE = "ERR_APP_SERVER_BEFORE"; //Ошибка пре exports.SERR_APP_SERVER_AFTER = "ERR_APP_SERVER_AFTER"; //Ошибка постобработчика exports.SERR_DB_SERVER = "ERR_DB_SERVER"; //Ошибка обработчика сервера БД +//Типовые коды ошибок брокера сообщений Kafka +exports.SERR_KAFKA_GROUP_UNAVAILABLE = "ERR_KAFKA_GROUP_UNAVAILABLE"; //Группа получателя недоступна +exports.SERR_KAFKA = "ERR_KAFKA"; //Ошибка +exports.SWARN_KAFKA = "WARN_KAFKA"; //Предупреждение +exports.SINFO_KAFKA = "INFO_KAFKA"; //Информация + +//Типовые коды MQTT +exports.SERR_MQTT = "ERR_MQTT"; //Ошибка +exports.SINFO_MQTT = "INFO_MQTT"; //Предупреждение + //Шаблоны подсветки консольных сообщений протокола работы exports.SCONSOLE_LOG_COLOR_PATTERN_ERR = "\x1b[31m%s\x1b[0m%s"; //Цвет для ошибок exports.SCONSOLE_LOG_COLOR_PATTERN_WRN = "\x1b[33m%s\x1b[0m%s"; //Цвет для предупреждений diff --git a/core/kafka_connector.js b/core/kafka_connector.js index 1ba72a0..990b8e6 100644 --- a/core/kafka_connector.js +++ b/core/kafka_connector.js @@ -9,19 +9,82 @@ const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции const { Kafka, Partitioners, logLevel } = require("kafkajs"); //Работа с Kafka +const { SERR_KAFKA_GROUP_UNAVAILABLE, SERR_KAFKA, SWARN_KAFKA, SINFO_KAFKA } = require("./constants"); //Глобальные константы +const { ServerError } = require("./server_errors"); //Типовая ошибка + +//---------- +// Константы +//---------- + +//Общие константы работы Kafka +const NCHECK_GROUP_RETRIES = 1; //Количество попыток подключения для проверки доступности группы + +//-------------------------- +// Вспомогательные функции +//-------------------------- + +//Проверка доступности группы +const checkGroupAvailable = async (clientProps, groupId) => { + //Иницализируем подключение к Kafka + let client = new Kafka({ + ...clientProps, + retry: { retries: NCHECK_GROUP_RETRIES } + }); + //Инициализируем доступ к командам + const admin = client.admin(); + //Подключаемся + await admin.connect(); + //Считываем информацию о группе + const groupInfo = await admin.describeGroups([groupId]); + //Отключаемся + await admin.disconnect(); + //Если в данной группе есть участники + if (groupInfo?.groups[0]?.members && groupInfo.groups[0].members.length !== 0) { + //Сообщаем о невозможности запустить сервис + throw new ServerError(SERR_KAFKA_GROUP_UNAVAILABLE, `${SERR_KAFKA}: Группа получателя "${groupId}" активна.`); + } +}; + +//Логгер для вывода внутренних сообщений Kafka в общий поток +const KafkaLogger = selfLogger => { + return level => { + return async ({ log }) => { + //Считываем текст сообщения и доп. информацию + const { message, ...logFullInfo } = log; + //Убираем лишнюю информацию из доп. информации + const { stack, timestamp, logger, ...logInfo } = logFullInfo; + //Исходим от уровня ошибки + switch (level) { + //Ошибка + case logLevel.ERROR: + await selfLogger.error(`${SERR_KAFKA}: ${message} ${JSON.stringify(logInfo)}`); + break; + //Предупреждение + case logLevel.WARN: + await selfLogger.warn(`${SWARN_KAFKA}: ${message} ${JSON.stringify(logInfo)}`); + break; + //Информация + case logLevel.INFO: + await selfLogger.info(`${SINFO_KAFKA}: ${message} ${JSON.stringify(logInfo)}`); + break; + } + }; + }; +}; //------------ // Тело модуля //------------ //Отправка сообщения Kafka -const publishKafka = async ({ settings, url, auth, topic, message }) => { +const publishKafka = async ({ settings, url, auth, topic, message, logger }) => { //Иницализируем подключение к Kafka let kafka = new Kafka({ clientId: settings.sClientIdSender, brokers: [url], connectionTimeout: settings.nConnectionTimeout, - logLevel: logLevel.NOTHING, + logLevel: logLevel[settings.sLogLevel] || logLevel.NOTHING, + logCreator: KafkaLogger(logger), ...auth }); //Инициализируем продюсера @@ -43,13 +106,20 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => let bLogLostConnection = true; //Получаем брокера по URL сервиса let sBroker = getKafkaBroker(service.sSrvRoot); - //Иницализируем подключение к Kafka - let client = new Kafka({ + //Формируем свойства подключения к Kafka + let clientProps = { clientId: settings.sClientIdRecipient, brokers: [sBroker], connectionTimeout: settings.nConnectionTimeout, ...getKafkaAuth(service.sSrvUser, service.sSrvPass, settings), - logLevel: logLevel.NOTHING, + logLevel: logLevel[settings.sLogLevel] || logLevel.NOTHING, + logCreator: KafkaLogger(logger) + }; + //Проверка доступности группы + await checkGroupAvailable(clientProps, settings.sGroupId); + //Иницализируем подключение к Kafka + let client = new Kafka({ + ...clientProps, retry: { retries: 0, maxRetryTime: settings.nMaxRetryTime, @@ -59,7 +129,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => //Если требуется вывести ошибку if (bLogLostConnection) { //Выводим ошибку - logger.error(`Соединение с Kafka потеряно (${sBroker}): ${makeErrorText(error)}`); + logger.error(`${SERR_KAFKA}: Соединение потеряно (${sBroker}): ${makeErrorText(error)}`); //Сбрасываем признак необходимости вывода ошибки bLogLostConnection = false; } @@ -90,7 +160,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => }) }); } catch (e) { - await logger.error(`Ошибка обработки входящего сообщения Kafka: ${makeErrorText(e)}`); + await logger.error(`${SERR_KAFKA}: Ошибка обработки входящего сообщения: ${makeErrorText(e)}`); } } }); @@ -99,7 +169,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => //Если сообщение о потере соединения уже выводилось if (!bLogLostConnection) { //Сообщим о восстановлении соединения - logger.info(`Соединение с Kafka восстановлено (${sBroker})`); + logger.info(`${SINFO_KAFKA}: Соединение восстановлено (${sBroker})`); //Устанавливаем признак сообщения о потере соединения bLogLostConnection = true; } @@ -107,7 +177,13 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => //Возвращаем соединение return consumer; } catch (e) { - await logger.error(`Ошибка запуска обработчика очереди входящих сообщений Kafka: ${makeErrorText(e)}`); + //Если это фатальная ошибка - выдаем её + if (e.sCode === SERR_KAFKA_GROUP_UNAVAILABLE) { + throw new ServerError(e.sCode, e.sMessage); + } else { + //Если ошибка не фатальная - выводим информацию + await logger.error(`${SERR_KAFKA}: Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`); + } } }; diff --git a/core/mqtt_connector.js b/core/mqtt_connector.js index e0af9e1..898a107 100644 --- a/core/mqtt_connector.js +++ b/core/mqtt_connector.js @@ -9,13 +9,22 @@ const { makeErrorText } = require("./utils"); //Вспомогательные функции const mqtt = require("mqtt"); //Работа с MQTT +const { SERR_MQTT, SINFO_MQTT } = require("./constants"); //Глобальные константы + +//---------- +// Константы +//---------- + +//Общие константы работы MQTT +const SLOG_ERROR = "ERROR"; //Уровень протоколирования - ошибки +const SLOG_INFO = "INFO"; //Уровень протоколирования - информация //------------ // Тело модуля //------------ //Отправка MQTT сообщения -const publishMQTT = async ({ settings, url, auth, topic, message }) => { +const publishMQTT = async ({ settings, url, auth, topic, message, logger }) => { //Инициализируем подключение const client = await mqtt.connectAsync(url, { clientId: settings.sClientIdSender, @@ -25,6 +34,14 @@ const publishMQTT = async ({ settings, url, auth, topic, message }) => { password: auth.pass, reconnectPeriod: settings.nReconnectPeriod }); + //Прослушиваем ошибки + client.on("error", e => { + //Если требуется выдавать ошибку + if (settings.sLogLevel === SLOG_ERROR) { + //Выводим ошибку + logger.error(`${SERR_MQTT}: ${makeErrorText(e)}`); + } + }); //Отправляем сообщение await client.publishAsync(topic, message); //Закрываем подключение @@ -64,18 +81,32 @@ const subscribeMQTT = async ({ settings, service, processMessage, logger }) => { }); //Прослушиваем отключение от сервера client.on("offline", () => { - //Выводим ошибку - logger.error(`Соединение с MQTT потеряно (${sBroker})`); + //Если требуется выдавать ошибку + if (settings.sLogLevel === SLOG_INFO) { + //Выводим ошибку + logger.error(`${SERR_MQTT}: Соединение потеряно (${sBroker})`); + } }); //Прослушиваем восстановление соединения client.on("connect", () => { - //Сообщим о восстановлении соединения - logger.info(`Соединение с MQTT восстановлено (${sBroker})`); + //Если требуется выдавать предупреждение + if (settings.sLogLevel === SLOG_INFO) { + //Сообщим о восстановлении соединения + logger.info(`${SINFO_MQTT}: Соединение восстановлено (${sBroker})`); + } + }); + //Прослушиваем ошибки + client.on("error", e => { + //Если требуется выдавать ошибку + if ([SLOG_ERROR, SLOG_INFO].includes(settings.sLogLevel)) { + //Выводим ошибку + logger.error(`${SERR_MQTT}: ${makeErrorText(e)}`); + } }); //Возвращаем подключение return client; } catch (e) { - logger.error(`Ошибка запуска обработчика очереди исходящих сообщений MQTT: ${makeErrorText(e)}`); + logger.error(`${SERR_MQTT}: Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`); } }; diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 3583925..4d3f895 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -343,7 +343,8 @@ const appProcess = async prms => { url: options.url, auth: options.auth, topic: options.topic, - message: options.body + message: options.body, + logger }); break; //MQTT/MQTTS @@ -353,7 +354,8 @@ const appProcess = async prms => { url: options.url, auth: options.auth, topic: options.topic, - message: options.body + message: options.body, + logger }); break; //HTTP/HTTPS diff --git a/models/obj_config.js b/models/obj_config.js index f17d7d4..7cea9df 100644 --- a/models/obj_config.js +++ b/models/obj_config.js @@ -9,6 +9,21 @@ const Schema = require("validate"); //Схемы валидации +//---------- +// Константы +//---------- + +//Уровни протоколирования подключения Kafka +const SKAFKA_LOG_LEVEL_NOTHING = "NOTHING"; //Протоколирование отключено +const SKAFKA_LOG_LEVEL_ERROR = "ERROR"; //Протоколирование ошибок +const SKAFKA_LOG_LEVEL_WARN = "WARN"; //Протоколирование предупреждений +const SKAFKA_LOG_LEVEL_INFO = "INFO"; //Протоколирование общей информации + +//Уровни протоколирования подключения MQTT +const SMQTT_LOG_LEVEL_NOTHING = "NOTHING"; //Протоколирование отключено +const SMQTT_LOG_LEVEL_ERROR = "ERROR"; //Протоколирование ошибок +const SMQTT_LOG_LEVEL_INFO = "INFO"; //Протоколирование информации + //------------- // Тело модуля //------------- @@ -52,6 +67,21 @@ const validatePoolIncrementInComing = val => val >= 0 && val <= 1000; //Функция проверки значения времени ожидания отработки входящего сообщения для обработчика входящих сообщений const validateTimeoutInComing = val => val >= 0; +//Функция проверки значения времени ожидания успешного подключения Kafka +const validateTimeoutKafka = val => val >= 0; + +//Функция проверки значения максимального ожидания между попытками переподключения Kafka +const validateMaxRetryTimeKafka = val => val >= 0; + +//Функция проверки значения максимального ожидания между попытками переподключения Kafka +const validateInitialRetryTimeKafka = val => val >= 0; + +//Функция проверки значения времени ожидания успешного подключения к MQTT +const validateConnectTimeoutMQTT = val => val >= 0; + +//Функция проверки значения времени ожидания между попытками переподключения к MQTT +const validateReconnectPeriodMQTT = val => val >= 0; + //Схема валидации общих параметров сервера приложений const common = new Schema({ //Наименование сервера приложений @@ -169,7 +199,8 @@ const outGoing = new Schema({ type: Boolean, required: true, message: { - type: path => `Признак проверки SSL-сертификатов адресов отправки сообщений (${path}) имеет некорректный тип данных (ожидалось - Number)`, + type: path => + `Признак проверки SSL-сертификатов адресов отправки сообщений (${path}) имеет некорректный тип данных (ожидалось - Boolean)`, required: path => `Не указан признак проверки SSL-сертификатов адресов отправки сообщений (${path})` } }, @@ -340,6 +371,246 @@ const inComing = new Schema({ } }); +//Схема валидации параметров SSL подключения к Kafka +const kafkaSSL = new Schema({ + //Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить) + bRejectUnauthorized: { + type: Boolean, + required: true, + message: { + type: path => + `Признак запрета использования самоподписанных сертификатов SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Boolean)`, + required: path => `Не указан признак запрета использования самоподписанных сертификатов SSL подключения к Kafka (${path})` + } + }, + //Путь к корневому сертификату с информацией об удостоверяющем центре + sPathCa: { + type: String, + required: false, + message: { + type: path => `Путь к корневому сертификату SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)` + } + }, + //Путь к закрытому ключу + sPathKey: { + type: String, + required: false, + message: { + type: path => `Путь к закрытому ключу SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)` + } + }, + //Путь к сертификату + sPathCert: { + type: String, + required: false, + message: { + type: path => `Путь к сертификату SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)` + } + } +}); + +//Схема валидации параметров подключения Kafka +const kafka = new Schema({ + //Мнемокод сервиса обмена + sService: { + type: String, + required: false, + message: { + type: path => `Мнемокод сервиса обмена подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)` + } + }, + //ID клиента-отправителя + sClientIdSender: { + type: String, + required: true, + message: { + type: path => `ID клиента-отправителя подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан ID клиента-отправителя подключения Kafka (${path})` + } + }, + //ID клиента-получателя + sClientIdRecipient: { + type: String, + required: true, + message: { + type: path => `ID клиента-получателя подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан ID клиента-получателя подключения Kafka (${path})` + } + }, + //Группа получателя + sGroupId: { + type: String, + required: true, + message: { + type: path => `Группа получателя подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указана группа получателя подключения Kafka (${path})` + } + }, + //Время ожидания успешного подключения (мс) + nConnectionTimeout: { + type: Number, + required: true, + use: { validateTimeoutKafka }, + message: { + type: path => `Время ожидания успешного подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указано время ожидания успешного подключения к Kafka (${path})`, + validateTimeoutKafka: path => `Время ожидания успешного подключения Kafka (${path}) должно быть неотрицательным целым числом` + } + }, + //Необходимость попытки переподключения при потере соединения + bRestartOnFailure: { + type: Boolean, + required: true, + message: { + type: path => `Признак необходимости попытки переподключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Boolean)`, + required: path => `Не указан признак необходимости попытки переподключения к Kafka (${path})` + } + }, + //Время максимального ожидания между попытками переподключения (мс) + nMaxRetryTime: { + type: Number, + required: true, + use: { validateMaxRetryTimeKafka }, + message: { + type: path => + `Время максимального ожидания между попытками переподключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указано время максимального ожидания между попытками переподключения к Kafka (${path})`, + validateMaxRetryTimeKafka: path => + `Время максимального ожидания между попытками переподключения к Kafka (${path}) должно быть неотрицательным целым числом` + } + }, + //Время ожидания между попытками переподключения (мс) + nInitialRetryTime: { + type: Number, + required: true, + use: { validateInitialRetryTimeKafka }, + message: { + type: path => `Время ожидания между попытками переподключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указано время ожидания между попытками переподключения к Kafka (${path})`, + validateInitialRetryTimeKafka: path => + `Время ожидания между попытками переподключения к Kafka (${path}) должно быть неотрицательным целым числом` + } + }, + //Уровень протоколирования подключения + sLogLevel: { + type: String, + enum: [SKAFKA_LOG_LEVEL_NOTHING, SKAFKA_LOG_LEVEL_ERROR, SKAFKA_LOG_LEVEL_WARN, SKAFKA_LOG_LEVEL_INFO], + required: true, + message: { + type: path => `Уровень протоколирования подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`, + enum: path => `Значение уровня протоколирования подключения к Kafka (${path}) не поддерживается`, + required: path => `Не указан уровень протоколирования подключения к Kafka (${path})` + } + }, + //Использовать аутентификацию по SSL-сертификату + bAuthSSL: { + type: Boolean, + required: true, + message: { + type: path => `Признак использования аутентификации по SSL к Kafka (${path}) имеет некорректный тип данных (ожидалось - Boolean)`, + required: path => `Не указан признак использования аутентификации по SSL к Kafka (${path})` + } + }, + //Параметры аутентификации по SSL-сертификату + ssl: { + schema: kafkaSSL, + required: true, + message: { + required: path => `Не указаны параметры аутентификации по SSL к Kafka (${path})` + } + } +}); + +//Описатель схемы валидации подключения к Kafka +const defKafka = (bRequired, sName) => { + return { + type: Array, + required: bRequired, + each: kafka, + message: { + type: `Список подключений Kafka (${sName}) имеет некорректный тип данных (ожидалось - Array)`, + required: `Не указан список подключений Kafka (${sName})` + } + }; +}; + +//Схема валидации параметров подключения MQTT +const mqtt = new Schema({ + //Мнемокод сервиса обмена + sService: { + type: String, + required: false, + message: { + type: path => `Мнемокод сервиса обмена подключения MQTT (${path}) имеет некорректный тип данных (ожидалось - String)` + } + }, + //ID клиента-отправителя + sClientIdSender: { + type: String, + required: true, + message: { + type: path => `ID клиента-отправителя подключения MQTT (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан ID клиента-отправителя подключения MQTT (${path})` + } + }, + //ID клиента-получателя + sClientIdRecipient: { + type: String, + required: true, + message: { + type: path => `ID клиента-получателя подключения MQTT (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан ID клиента-получателя подключения MQTT (${path})` + } + }, + //Время ожидания успешного подключения (мс) + nConnectTimeout: { + type: Number, + required: true, + use: { validateConnectTimeoutMQTT }, + message: { + type: path => `Время ожидания успешного подключения к MQTT (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указано время ожидания успешного подключения к MQTT (${path})`, + validateConnectTimeoutMQTT: path => `Время ожидания успешного подключения к MQTT (${path}) должно быть неотрицательным целым числом` + } + }, + //Время ожидания между попытками переподключения (мс) + nReconnectPeriod: { + type: Number, + required: true, + use: { validateReconnectPeriodMQTT }, + message: { + type: path => `Время ожидания между попытками переподключения к MQTT (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указано время ожидания между попытками переподключения к MQTT (${path})`, + validateReconnectPeriodMQTT: path => + `Время ожидания между попытками переподключения к MQTT (${path}) должно быть неотрицательным целым числом` + } + }, + //Уровень протоколирования подключения + sLogLevel: { + type: String, + enum: [SMQTT_LOG_LEVEL_NOTHING, SMQTT_LOG_LEVEL_ERROR, SMQTT_LOG_LEVEL_INFO], + required: true, + message: { + type: path => `Уровень протоколирования подключения к MQTT (${path}) имеет некорректный тип данных (ожидалось - String)`, + enum: path => `Значение уровня протоколирования подключения к MQTT (${path}) не поддерживается`, + required: path => `Не указан уровень протоколирования подключения к MQTT (${path})` + } + } +}); + +//Описатель схемы валидации подключения к MQTT +const defMQTT = (bRequired, sName) => { + return { + type: Array, + required: bRequired, + each: mqtt, + message: { + type: `Список подключений MQTT (${sName}) имеет некорректный тип данных (ожидалось - Array)`, + required: `Не указан список подключений MQTT (${sName})` + } + }; +}; + //Схема валидации параметров отправки E-Mail уведомлений const mail = new Schema({ //Адреc сервера SMTP @@ -441,6 +712,22 @@ const config = new Schema({ required: path => `Не указаны параметры обработки очереди входящих сообщений (${path})` } }, + //Параметры подключения к Kafka + kafka: { + schema: defKafka(true, "kafka"), + required: true, + message: { + required: path => `Не указаны параметры подключения Kafka (${path})` + } + }, + //Параметры подключения к MQTT + mqtt: { + schema: defMQTT(true, "mqtt"), + required: true, + message: { + required: path => `Не указаны параметры подключения MQTT (${path})` + } + }, //Параметры отправки E-Mail уведомлений mail: { schema: mail,