From b573b16000ff04b2c4071980ac1c7c42a74cc632 Mon Sep 17 00:00:00 2001 From: Dollerino Date: Mon, 8 Dec 2025 15:49:30 +0300 Subject: [PATCH 1/2] =?UTF-8?q?=D0=A6=D0=98=D0=A2=D0=9A-1012=20-=20=D0=94?= =?UTF-8?q?=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD=D1=8B=20=D0=BF=D0=B0?= =?UTF-8?q?=D1=80=D0=B0=D0=BC=D0=B5=D1=82=D1=80=D1=8B=20=D0=BF=D1=80=D0=BE?= =?UTF-8?q?=D1=82=D0=BE=D0=BA=D0=BE=D0=BB=D0=B8=D1=80=D0=BE=D0=B2=D0=B0?= =?UTF-8?q?=D0=BD=D0=B8=D1=8F=20=D0=B4=D0=BB=D1=8F=20=D0=B1=D1=80=D0=BE?= =?UTF-8?q?=D0=BA=D0=B5=D1=80=D0=BE=D0=B2=20MQTT/KAFKA,=20=D1=83=D1=81?= =?UTF-8?q?=D1=82=D1=80=D0=B0=D0=BD=D0=B5=D0=BD=D0=B0=20=D0=BE=D1=88=D0=B8?= =?UTF-8?q?=D0=B1=D0=BA=D0=B0=20=D0=BF=D0=BE=D0=B4=D0=BA=D0=BB=D1=8E=D1=87?= =?UTF-8?q?=D0=B5=D0=BD=D0=B8=D1=8F=20=D1=81=20=D0=B8=D1=81=D0=BF=D0=BE?= =?UTF-8?q?=D0=BB=D1=8C=D0=B7=D1=83=D1=8E=D1=89=D0=B8=D0=BC=D1=81=D1=8F=20?= =?UTF-8?q?sGroupId?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.js | 6 ++- core/constants.js | 10 ++++ core/kafka_connector.js | 94 +++++++++++++++++++++++++++++++++---- core/mqtt_connector.js | 43 ++++++++++++++--- core/out_queue_processor.js | 6 ++- 5 files changed, 141 insertions(+), 18 deletions(-) diff --git a/config.js b/config.js index 7bd3b7a..9395326 100644 --- a/config.js +++ b/config.js @@ -94,6 +94,8 @@ const kafka = [ nMaxRetryTime: 20000, //Время ожидания между попытками переподключения (мс) nInitialRetryTime: 10000, + //Уровень протоколирования подключения (NOTHING - ничего, ERROR - ошибки, WARN - предупреждения, INFO - общая информация) + sLogLevel: "NOTHING", //Использовать аутентификацию по SSL-сертификату bAuthSSL: false, //Параметры аутентификации по SSL-сертификату @@ -122,7 +124,9 @@ const mqtt = [ //Время ожидания успешного подключения (мс) nConnectTimeout: 5000, //Время ожидания между попытками переподключения (мс) - nReconnectPeriod: 10000 + nReconnectPeriod: 10000, + //Уровень протоколирования подключения (NOTHING - ничего, ERROR - ошибки, INFO - общая информация) + sLogLevel: "NOTHING" } ]; diff --git a/core/constants.js b/core/constants.js index 421d22e..b03d566 100644 --- a/core/constants.js +++ b/core/constants.js @@ -38,6 +38,16 @@ exports.SERR_APP_SERVER_BEFORE = "ERR_APP_SERVER_BEFORE"; //Ошибка пре exports.SERR_APP_SERVER_AFTER = "ERR_APP_SERVER_AFTER"; //Ошибка постобработчика exports.SERR_DB_SERVER = "ERR_DB_SERVER"; //Ошибка обработчика сервера БД +//Типовые коды ошибок брокера сообщений Kafka +exports.SERR_KAFKA_GROUP_UNAVAILABLE = "ERR_KAFKA_GROUP_UNAVAILABLE"; //Группа получателя недоступна +exports.SERR_KAFKA = "ERR_KAFKA"; //Ошибка +exports.SWARN_KAFKA = "WARN_KAFKA"; //Предупреждение +exports.SINFO_KAFKA = "INFO_KAFKA"; //Информация + +//Типовые коды MQTT +exports.SERR_MQTT = "ERR_MQTT"; //Ошибка +exports.SINFO_MQTT = "INFO_MQTT"; //Предупреждение + //Шаблоны подсветки консольных сообщений протокола работы exports.SCONSOLE_LOG_COLOR_PATTERN_ERR = "\x1b[31m%s\x1b[0m%s"; //Цвет для ошибок exports.SCONSOLE_LOG_COLOR_PATTERN_WRN = "\x1b[33m%s\x1b[0m%s"; //Цвет для предупреждений diff --git a/core/kafka_connector.js b/core/kafka_connector.js index 1ba72a0..990b8e6 100644 --- a/core/kafka_connector.js +++ b/core/kafka_connector.js @@ -9,19 +9,82 @@ const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции const { Kafka, Partitioners, logLevel } = require("kafkajs"); //Работа с Kafka +const { SERR_KAFKA_GROUP_UNAVAILABLE, SERR_KAFKA, SWARN_KAFKA, SINFO_KAFKA } = require("./constants"); //Глобальные константы +const { ServerError } = require("./server_errors"); //Типовая ошибка + +//---------- +// Константы +//---------- + +//Общие константы работы Kafka +const NCHECK_GROUP_RETRIES = 1; //Количество попыток подключения для проверки доступности группы + +//-------------------------- +// Вспомогательные функции +//-------------------------- + +//Проверка доступности группы +const checkGroupAvailable = async (clientProps, groupId) => { + //Иницализируем подключение к Kafka + let client = new Kafka({ + ...clientProps, + retry: { retries: NCHECK_GROUP_RETRIES } + }); + //Инициализируем доступ к командам + const admin = client.admin(); + //Подключаемся + await admin.connect(); + //Считываем информацию о группе + const groupInfo = await admin.describeGroups([groupId]); + //Отключаемся + await admin.disconnect(); + //Если в данной группе есть участники + if (groupInfo?.groups[0]?.members && groupInfo.groups[0].members.length !== 0) { + //Сообщаем о невозможности запустить сервис + throw new ServerError(SERR_KAFKA_GROUP_UNAVAILABLE, `${SERR_KAFKA}: Группа получателя "${groupId}" активна.`); + } +}; + +//Логгер для вывода внутренних сообщений Kafka в общий поток +const KafkaLogger = selfLogger => { + return level => { + return async ({ log }) => { + //Считываем текст сообщения и доп. информацию + const { message, ...logFullInfo } = log; + //Убираем лишнюю информацию из доп. информации + const { stack, timestamp, logger, ...logInfo } = logFullInfo; + //Исходим от уровня ошибки + switch (level) { + //Ошибка + case logLevel.ERROR: + await selfLogger.error(`${SERR_KAFKA}: ${message} ${JSON.stringify(logInfo)}`); + break; + //Предупреждение + case logLevel.WARN: + await selfLogger.warn(`${SWARN_KAFKA}: ${message} ${JSON.stringify(logInfo)}`); + break; + //Информация + case logLevel.INFO: + await selfLogger.info(`${SINFO_KAFKA}: ${message} ${JSON.stringify(logInfo)}`); + break; + } + }; + }; +}; //------------ // Тело модуля //------------ //Отправка сообщения Kafka -const publishKafka = async ({ settings, url, auth, topic, message }) => { +const publishKafka = async ({ settings, url, auth, topic, message, logger }) => { //Иницализируем подключение к Kafka let kafka = new Kafka({ clientId: settings.sClientIdSender, brokers: [url], connectionTimeout: settings.nConnectionTimeout, - logLevel: logLevel.NOTHING, + logLevel: logLevel[settings.sLogLevel] || logLevel.NOTHING, + logCreator: KafkaLogger(logger), ...auth }); //Инициализируем продюсера @@ -43,13 +106,20 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => let bLogLostConnection = true; //Получаем брокера по URL сервиса let sBroker = getKafkaBroker(service.sSrvRoot); - //Иницализируем подключение к Kafka - let client = new Kafka({ + //Формируем свойства подключения к Kafka + let clientProps = { clientId: settings.sClientIdRecipient, brokers: [sBroker], connectionTimeout: settings.nConnectionTimeout, ...getKafkaAuth(service.sSrvUser, service.sSrvPass, settings), - logLevel: logLevel.NOTHING, + logLevel: logLevel[settings.sLogLevel] || logLevel.NOTHING, + logCreator: KafkaLogger(logger) + }; + //Проверка доступности группы + await checkGroupAvailable(clientProps, settings.sGroupId); + //Иницализируем подключение к Kafka + let client = new Kafka({ + ...clientProps, retry: { retries: 0, maxRetryTime: settings.nMaxRetryTime, @@ -59,7 +129,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => //Если требуется вывести ошибку if (bLogLostConnection) { //Выводим ошибку - logger.error(`Соединение с Kafka потеряно (${sBroker}): ${makeErrorText(error)}`); + logger.error(`${SERR_KAFKA}: Соединение потеряно (${sBroker}): ${makeErrorText(error)}`); //Сбрасываем признак необходимости вывода ошибки bLogLostConnection = false; } @@ -90,7 +160,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => }) }); } catch (e) { - await logger.error(`Ошибка обработки входящего сообщения Kafka: ${makeErrorText(e)}`); + await logger.error(`${SERR_KAFKA}: Ошибка обработки входящего сообщения: ${makeErrorText(e)}`); } } }); @@ -99,7 +169,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => //Если сообщение о потере соединения уже выводилось if (!bLogLostConnection) { //Сообщим о восстановлении соединения - logger.info(`Соединение с Kafka восстановлено (${sBroker})`); + logger.info(`${SINFO_KAFKA}: Соединение восстановлено (${sBroker})`); //Устанавливаем признак сообщения о потере соединения bLogLostConnection = true; } @@ -107,7 +177,13 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) => //Возвращаем соединение return consumer; } catch (e) { - await logger.error(`Ошибка запуска обработчика очереди входящих сообщений Kafka: ${makeErrorText(e)}`); + //Если это фатальная ошибка - выдаем её + if (e.sCode === SERR_KAFKA_GROUP_UNAVAILABLE) { + throw new ServerError(e.sCode, e.sMessage); + } else { + //Если ошибка не фатальная - выводим информацию + await logger.error(`${SERR_KAFKA}: Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`); + } } }; diff --git a/core/mqtt_connector.js b/core/mqtt_connector.js index e0af9e1..d0292aa 100644 --- a/core/mqtt_connector.js +++ b/core/mqtt_connector.js @@ -9,13 +9,22 @@ const { makeErrorText } = require("./utils"); //Вспомогательные функции const mqtt = require("mqtt"); //Работа с MQTT +const { SERR_MQTT, SINFO_MQTT } = require("./constants"); //Глобальные константы + +//---------- +// Константы +//---------- + +//Общие константы работы MQTT +const SLOG_ERROR = "ERROR"; //Уровень протоколирования - ошибки +const SLOG_INFO = "INFO"; //Уровень протоколирования - информация //------------ // Тело модуля //------------ //Отправка MQTT сообщения -const publishMQTT = async ({ settings, url, auth, topic, message }) => { +const publishMQTT = async ({ settings, url, auth, topic, message, logger }) => { //Инициализируем подключение const client = await mqtt.connectAsync(url, { clientId: settings.sClientIdSender, @@ -25,6 +34,14 @@ const publishMQTT = async ({ settings, url, auth, topic, message }) => { password: auth.pass, reconnectPeriod: settings.nReconnectPeriod }); + //Прослушиваем ошибки + client.on("error", e => { + //Если требуется выдавать ошибку + if (settings.sLogLevel === SLOG_ERROR) { + //Выводим ошибку + logger.error(`${SERR_MQTT}: ${makeErrorText(e)}`); + } + }); //Отправляем сообщение await client.publishAsync(topic, message); //Закрываем подключение @@ -64,18 +81,32 @@ const subscribeMQTT = async ({ settings, service, processMessage, logger }) => { }); //Прослушиваем отключение от сервера client.on("offline", () => { - //Выводим ошибку - logger.error(`Соединение с MQTT потеряно (${sBroker})`); + //Если требуется выдавать ошибку + if (settings.sLogLevel === SLOG_ERROR) { + //Выводим ошибку + logger.error(`${SERR_MQTT}: Соединение потеряно (${sBroker})`); + } }); //Прослушиваем восстановление соединения client.on("connect", () => { - //Сообщим о восстановлении соединения - logger.info(`Соединение с MQTT восстановлено (${sBroker})`); + //Если требуется выдавать предупреждение + if ([SLOG_ERROR, SLOG_INFO].includes(settings.sLogLevel)) { + //Сообщим о восстановлении соединения + logger.info(`${SINFO_MQTT}: Соединение восстановлено (${sBroker})`); + } + }); + //Прослушиваем ошибки + client.on("error", e => { + //Если требуется выдавать ошибку + if (settings.sLogLevel === SLOG_ERROR) { + //Выводим ошибку + logger.error(`${SERR_MQTT}: ${makeErrorText(e)}`); + } }); //Возвращаем подключение return client; } catch (e) { - logger.error(`Ошибка запуска обработчика очереди исходящих сообщений MQTT: ${makeErrorText(e)}`); + logger.error(`${SERR_MQTT}: Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`); } }; diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 3583925..4d3f895 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -343,7 +343,8 @@ const appProcess = async prms => { url: options.url, auth: options.auth, topic: options.topic, - message: options.body + message: options.body, + logger }); break; //MQTT/MQTTS @@ -353,7 +354,8 @@ const appProcess = async prms => { url: options.url, auth: options.auth, topic: options.topic, - message: options.body + message: options.body, + logger }); break; //HTTP/HTTPS -- 2.34.1 From 1dc5b5d10850566ef25d02fb14c88ff3e3c3fc41 Mon Sep 17 00:00:00 2001 From: Dollerino Date: Thu, 25 Dec 2025 20:32:01 +0300 Subject: [PATCH 2/2] =?UTF-8?q?=D0=A6=D0=98=D0=A2=D0=9A-1012=20-=20=D0=94?= =?UTF-8?q?=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD=D1=8B=20=D0=BF=D0=B0?= =?UTF-8?q?=D1=80=D0=B0=D0=BC=D0=B5=D1=82=D1=80=D1=8B=20=D0=BF=D1=80=D0=BE?= =?UTF-8?q?=D1=82=D0=BE=D0=BA=D0=BE=D0=BB=D0=B8=D1=80=D0=BE=D0=B2=D0=B0?= =?UTF-8?q?=D0=BD=D0=B8=D1=8F=20=D0=B4=D0=BB=D1=8F=20=D0=B1=D1=80=D0=BE?= =?UTF-8?q?=D0=BA=D0=B5=D1=80=D0=BE=D0=B2=20MQTT/KAFKA,=20=D1=83=D1=81?= =?UTF-8?q?=D1=82=D1=80=D0=B0=D0=BD=D0=B5=D0=BD=D0=B0=20=D0=BE=D1=88=D0=B8?= =?UTF-8?q?=D0=B1=D0=BA=D0=B0=20=D0=BF=D0=BE=D0=B4=D0=BA=D0=BB=D1=8E=D1=87?= =?UTF-8?q?=D0=B5=D0=BD=D0=B8=D1=8F=20=D1=81=20=D0=B8=D1=81=D0=BF=D0=BE?= =?UTF-8?q?=D0=BB=D1=8C=D0=B7=D1=83=D1=8E=D1=89=D0=B8=D0=BC=D1=81=D1=8F=20?= =?UTF-8?q?sGroupId?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/mqtt_connector.js | 6 +- models/obj_config.js | 289 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 291 insertions(+), 4 deletions(-) diff --git a/core/mqtt_connector.js b/core/mqtt_connector.js index d0292aa..898a107 100644 --- a/core/mqtt_connector.js +++ b/core/mqtt_connector.js @@ -82,7 +82,7 @@ const subscribeMQTT = async ({ settings, service, processMessage, logger }) => { //Прослушиваем отключение от сервера client.on("offline", () => { //Если требуется выдавать ошибку - if (settings.sLogLevel === SLOG_ERROR) { + if (settings.sLogLevel === SLOG_INFO) { //Выводим ошибку logger.error(`${SERR_MQTT}: Соединение потеряно (${sBroker})`); } @@ -90,7 +90,7 @@ const subscribeMQTT = async ({ settings, service, processMessage, logger }) => { //Прослушиваем восстановление соединения client.on("connect", () => { //Если требуется выдавать предупреждение - if ([SLOG_ERROR, SLOG_INFO].includes(settings.sLogLevel)) { + if (settings.sLogLevel === SLOG_INFO) { //Сообщим о восстановлении соединения logger.info(`${SINFO_MQTT}: Соединение восстановлено (${sBroker})`); } @@ -98,7 +98,7 @@ const subscribeMQTT = async ({ settings, service, processMessage, logger }) => { //Прослушиваем ошибки client.on("error", e => { //Если требуется выдавать ошибку - if (settings.sLogLevel === SLOG_ERROR) { + if ([SLOG_ERROR, SLOG_INFO].includes(settings.sLogLevel)) { //Выводим ошибку logger.error(`${SERR_MQTT}: ${makeErrorText(e)}`); } diff --git a/models/obj_config.js b/models/obj_config.js index f17d7d4..7cea9df 100644 --- a/models/obj_config.js +++ b/models/obj_config.js @@ -9,6 +9,21 @@ const Schema = require("validate"); //Схемы валидации +//---------- +// Константы +//---------- + +//Уровни протоколирования подключения Kafka +const SKAFKA_LOG_LEVEL_NOTHING = "NOTHING"; //Протоколирование отключено +const SKAFKA_LOG_LEVEL_ERROR = "ERROR"; //Протоколирование ошибок +const SKAFKA_LOG_LEVEL_WARN = "WARN"; //Протоколирование предупреждений +const SKAFKA_LOG_LEVEL_INFO = "INFO"; //Протоколирование общей информации + +//Уровни протоколирования подключения MQTT +const SMQTT_LOG_LEVEL_NOTHING = "NOTHING"; //Протоколирование отключено +const SMQTT_LOG_LEVEL_ERROR = "ERROR"; //Протоколирование ошибок +const SMQTT_LOG_LEVEL_INFO = "INFO"; //Протоколирование информации + //------------- // Тело модуля //------------- @@ -52,6 +67,21 @@ const validatePoolIncrementInComing = val => val >= 0 && val <= 1000; //Функция проверки значения времени ожидания отработки входящего сообщения для обработчика входящих сообщений const validateTimeoutInComing = val => val >= 0; +//Функция проверки значения времени ожидания успешного подключения Kafka +const validateTimeoutKafka = val => val >= 0; + +//Функция проверки значения максимального ожидания между попытками переподключения Kafka +const validateMaxRetryTimeKafka = val => val >= 0; + +//Функция проверки значения максимального ожидания между попытками переподключения Kafka +const validateInitialRetryTimeKafka = val => val >= 0; + +//Функция проверки значения времени ожидания успешного подключения к MQTT +const validateConnectTimeoutMQTT = val => val >= 0; + +//Функция проверки значения времени ожидания между попытками переподключения к MQTT +const validateReconnectPeriodMQTT = val => val >= 0; + //Схема валидации общих параметров сервера приложений const common = new Schema({ //Наименование сервера приложений @@ -169,7 +199,8 @@ const outGoing = new Schema({ type: Boolean, required: true, message: { - type: path => `Признак проверки SSL-сертификатов адресов отправки сообщений (${path}) имеет некорректный тип данных (ожидалось - Number)`, + type: path => + `Признак проверки SSL-сертификатов адресов отправки сообщений (${path}) имеет некорректный тип данных (ожидалось - Boolean)`, required: path => `Не указан признак проверки SSL-сертификатов адресов отправки сообщений (${path})` } }, @@ -340,6 +371,246 @@ const inComing = new Schema({ } }); +//Схема валидации параметров SSL подключения к Kafka +const kafkaSSL = new Schema({ + //Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить) + bRejectUnauthorized: { + type: Boolean, + required: true, + message: { + type: path => + `Признак запрета использования самоподписанных сертификатов SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Boolean)`, + required: path => `Не указан признак запрета использования самоподписанных сертификатов SSL подключения к Kafka (${path})` + } + }, + //Путь к корневому сертификату с информацией об удостоверяющем центре + sPathCa: { + type: String, + required: false, + message: { + type: path => `Путь к корневому сертификату SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)` + } + }, + //Путь к закрытому ключу + sPathKey: { + type: String, + required: false, + message: { + type: path => `Путь к закрытому ключу SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)` + } + }, + //Путь к сертификату + sPathCert: { + type: String, + required: false, + message: { + type: path => `Путь к сертификату SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)` + } + } +}); + +//Схема валидации параметров подключения Kafka +const kafka = new Schema({ + //Мнемокод сервиса обмена + sService: { + type: String, + required: false, + message: { + type: path => `Мнемокод сервиса обмена подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)` + } + }, + //ID клиента-отправителя + sClientIdSender: { + type: String, + required: true, + message: { + type: path => `ID клиента-отправителя подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан ID клиента-отправителя подключения Kafka (${path})` + } + }, + //ID клиента-получателя + sClientIdRecipient: { + type: String, + required: true, + message: { + type: path => `ID клиента-получателя подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан ID клиента-получателя подключения Kafka (${path})` + } + }, + //Группа получателя + sGroupId: { + type: String, + required: true, + message: { + type: path => `Группа получателя подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указана группа получателя подключения Kafka (${path})` + } + }, + //Время ожидания успешного подключения (мс) + nConnectionTimeout: { + type: Number, + required: true, + use: { validateTimeoutKafka }, + message: { + type: path => `Время ожидания успешного подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указано время ожидания успешного подключения к Kafka (${path})`, + validateTimeoutKafka: path => `Время ожидания успешного подключения Kafka (${path}) должно быть неотрицательным целым числом` + } + }, + //Необходимость попытки переподключения при потере соединения + bRestartOnFailure: { + type: Boolean, + required: true, + message: { + type: path => `Признак необходимости попытки переподключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Boolean)`, + required: path => `Не указан признак необходимости попытки переподключения к Kafka (${path})` + } + }, + //Время максимального ожидания между попытками переподключения (мс) + nMaxRetryTime: { + type: Number, + required: true, + use: { validateMaxRetryTimeKafka }, + message: { + type: path => + `Время максимального ожидания между попытками переподключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указано время максимального ожидания между попытками переподключения к Kafka (${path})`, + validateMaxRetryTimeKafka: path => + `Время максимального ожидания между попытками переподключения к Kafka (${path}) должно быть неотрицательным целым числом` + } + }, + //Время ожидания между попытками переподключения (мс) + nInitialRetryTime: { + type: Number, + required: true, + use: { validateInitialRetryTimeKafka }, + message: { + type: path => `Время ожидания между попытками переподключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указано время ожидания между попытками переподключения к Kafka (${path})`, + validateInitialRetryTimeKafka: path => + `Время ожидания между попытками переподключения к Kafka (${path}) должно быть неотрицательным целым числом` + } + }, + //Уровень протоколирования подключения + sLogLevel: { + type: String, + enum: [SKAFKA_LOG_LEVEL_NOTHING, SKAFKA_LOG_LEVEL_ERROR, SKAFKA_LOG_LEVEL_WARN, SKAFKA_LOG_LEVEL_INFO], + required: true, + message: { + type: path => `Уровень протоколирования подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`, + enum: path => `Значение уровня протоколирования подключения к Kafka (${path}) не поддерживается`, + required: path => `Не указан уровень протоколирования подключения к Kafka (${path})` + } + }, + //Использовать аутентификацию по SSL-сертификату + bAuthSSL: { + type: Boolean, + required: true, + message: { + type: path => `Признак использования аутентификации по SSL к Kafka (${path}) имеет некорректный тип данных (ожидалось - Boolean)`, + required: path => `Не указан признак использования аутентификации по SSL к Kafka (${path})` + } + }, + //Параметры аутентификации по SSL-сертификату + ssl: { + schema: kafkaSSL, + required: true, + message: { + required: path => `Не указаны параметры аутентификации по SSL к Kafka (${path})` + } + } +}); + +//Описатель схемы валидации подключения к Kafka +const defKafka = (bRequired, sName) => { + return { + type: Array, + required: bRequired, + each: kafka, + message: { + type: `Список подключений Kafka (${sName}) имеет некорректный тип данных (ожидалось - Array)`, + required: `Не указан список подключений Kafka (${sName})` + } + }; +}; + +//Схема валидации параметров подключения MQTT +const mqtt = new Schema({ + //Мнемокод сервиса обмена + sService: { + type: String, + required: false, + message: { + type: path => `Мнемокод сервиса обмена подключения MQTT (${path}) имеет некорректный тип данных (ожидалось - String)` + } + }, + //ID клиента-отправителя + sClientIdSender: { + type: String, + required: true, + message: { + type: path => `ID клиента-отправителя подключения MQTT (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан ID клиента-отправителя подключения MQTT (${path})` + } + }, + //ID клиента-получателя + sClientIdRecipient: { + type: String, + required: true, + message: { + type: path => `ID клиента-получателя подключения MQTT (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан ID клиента-получателя подключения MQTT (${path})` + } + }, + //Время ожидания успешного подключения (мс) + nConnectTimeout: { + type: Number, + required: true, + use: { validateConnectTimeoutMQTT }, + message: { + type: path => `Время ожидания успешного подключения к MQTT (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указано время ожидания успешного подключения к MQTT (${path})`, + validateConnectTimeoutMQTT: path => `Время ожидания успешного подключения к MQTT (${path}) должно быть неотрицательным целым числом` + } + }, + //Время ожидания между попытками переподключения (мс) + nReconnectPeriod: { + type: Number, + required: true, + use: { validateReconnectPeriodMQTT }, + message: { + type: path => `Время ожидания между попытками переподключения к MQTT (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указано время ожидания между попытками переподключения к MQTT (${path})`, + validateReconnectPeriodMQTT: path => + `Время ожидания между попытками переподключения к MQTT (${path}) должно быть неотрицательным целым числом` + } + }, + //Уровень протоколирования подключения + sLogLevel: { + type: String, + enum: [SMQTT_LOG_LEVEL_NOTHING, SMQTT_LOG_LEVEL_ERROR, SMQTT_LOG_LEVEL_INFO], + required: true, + message: { + type: path => `Уровень протоколирования подключения к MQTT (${path}) имеет некорректный тип данных (ожидалось - String)`, + enum: path => `Значение уровня протоколирования подключения к MQTT (${path}) не поддерживается`, + required: path => `Не указан уровень протоколирования подключения к MQTT (${path})` + } + } +}); + +//Описатель схемы валидации подключения к MQTT +const defMQTT = (bRequired, sName) => { + return { + type: Array, + required: bRequired, + each: mqtt, + message: { + type: `Список подключений MQTT (${sName}) имеет некорректный тип данных (ожидалось - Array)`, + required: `Не указан список подключений MQTT (${sName})` + } + }; +}; + //Схема валидации параметров отправки E-Mail уведомлений const mail = new Schema({ //Адреc сервера SMTP @@ -441,6 +712,22 @@ const config = new Schema({ required: path => `Не указаны параметры обработки очереди входящих сообщений (${path})` } }, + //Параметры подключения к Kafka + kafka: { + schema: defKafka(true, "kafka"), + required: true, + message: { + required: path => `Не указаны параметры подключения Kafka (${path})` + } + }, + //Параметры подключения к MQTT + mqtt: { + schema: defMQTT(true, "mqtt"), + required: true, + message: { + required: path => `Не указаны параметры подключения MQTT (${path})` + } + }, //Параметры отправки E-Mail уведомлений mail: { schema: mail, -- 2.34.1