ЦИТК-1012 - Добавлены параметры протоколирования для брокеров MQTT/KAFKA, устранена ошибка подключения с использующимся sGroupId
This commit is contained in:
parent
b573b16000
commit
1dc5b5d108
@ -82,7 +82,7 @@ const subscribeMQTT = async ({ settings, service, processMessage, logger }) => {
|
||||
//Прослушиваем отключение от сервера
|
||||
client.on("offline", () => {
|
||||
//Если требуется выдавать ошибку
|
||||
if (settings.sLogLevel === SLOG_ERROR) {
|
||||
if (settings.sLogLevel === SLOG_INFO) {
|
||||
//Выводим ошибку
|
||||
logger.error(`${SERR_MQTT}: Соединение потеряно (${sBroker})`);
|
||||
}
|
||||
@ -90,7 +90,7 @@ const subscribeMQTT = async ({ settings, service, processMessage, logger }) => {
|
||||
//Прослушиваем восстановление соединения
|
||||
client.on("connect", () => {
|
||||
//Если требуется выдавать предупреждение
|
||||
if ([SLOG_ERROR, SLOG_INFO].includes(settings.sLogLevel)) {
|
||||
if (settings.sLogLevel === SLOG_INFO) {
|
||||
//Сообщим о восстановлении соединения
|
||||
logger.info(`${SINFO_MQTT}: Соединение восстановлено (${sBroker})`);
|
||||
}
|
||||
@ -98,7 +98,7 @@ const subscribeMQTT = async ({ settings, service, processMessage, logger }) => {
|
||||
//Прослушиваем ошибки
|
||||
client.on("error", e => {
|
||||
//Если требуется выдавать ошибку
|
||||
if (settings.sLogLevel === SLOG_ERROR) {
|
||||
if ([SLOG_ERROR, SLOG_INFO].includes(settings.sLogLevel)) {
|
||||
//Выводим ошибку
|
||||
logger.error(`${SERR_MQTT}: ${makeErrorText(e)}`);
|
||||
}
|
||||
|
||||
@ -9,6 +9,21 @@
|
||||
|
||||
const Schema = require("validate"); //Схемы валидации
|
||||
|
||||
//----------
|
||||
// Константы
|
||||
//----------
|
||||
|
||||
//Уровни протоколирования подключения Kafka
|
||||
const SKAFKA_LOG_LEVEL_NOTHING = "NOTHING"; //Протоколирование отключено
|
||||
const SKAFKA_LOG_LEVEL_ERROR = "ERROR"; //Протоколирование ошибок
|
||||
const SKAFKA_LOG_LEVEL_WARN = "WARN"; //Протоколирование предупреждений
|
||||
const SKAFKA_LOG_LEVEL_INFO = "INFO"; //Протоколирование общей информации
|
||||
|
||||
//Уровни протоколирования подключения MQTT
|
||||
const SMQTT_LOG_LEVEL_NOTHING = "NOTHING"; //Протоколирование отключено
|
||||
const SMQTT_LOG_LEVEL_ERROR = "ERROR"; //Протоколирование ошибок
|
||||
const SMQTT_LOG_LEVEL_INFO = "INFO"; //Протоколирование информации
|
||||
|
||||
//-------------
|
||||
// Тело модуля
|
||||
//-------------
|
||||
@ -52,6 +67,21 @@ const validatePoolIncrementInComing = val => val >= 0 && val <= 1000;
|
||||
//Функция проверки значения времени ожидания отработки входящего сообщения для обработчика входящих сообщений
|
||||
const validateTimeoutInComing = val => val >= 0;
|
||||
|
||||
//Функция проверки значения времени ожидания успешного подключения Kafka
|
||||
const validateTimeoutKafka = val => val >= 0;
|
||||
|
||||
//Функция проверки значения максимального ожидания между попытками переподключения Kafka
|
||||
const validateMaxRetryTimeKafka = val => val >= 0;
|
||||
|
||||
//Функция проверки значения максимального ожидания между попытками переподключения Kafka
|
||||
const validateInitialRetryTimeKafka = val => val >= 0;
|
||||
|
||||
//Функция проверки значения времени ожидания успешного подключения к MQTT
|
||||
const validateConnectTimeoutMQTT = val => val >= 0;
|
||||
|
||||
//Функция проверки значения времени ожидания между попытками переподключения к MQTT
|
||||
const validateReconnectPeriodMQTT = val => val >= 0;
|
||||
|
||||
//Схема валидации общих параметров сервера приложений
|
||||
const common = new Schema({
|
||||
//Наименование сервера приложений
|
||||
@ -169,7 +199,8 @@ const outGoing = new Schema({
|
||||
type: Boolean,
|
||||
required: true,
|
||||
message: {
|
||||
type: path => `Признак проверки SSL-сертификатов адресов отправки сообщений (${path}) имеет некорректный тип данных (ожидалось - Number)`,
|
||||
type: path =>
|
||||
`Признак проверки SSL-сертификатов адресов отправки сообщений (${path}) имеет некорректный тип данных (ожидалось - Boolean)`,
|
||||
required: path => `Не указан признак проверки SSL-сертификатов адресов отправки сообщений (${path})`
|
||||
}
|
||||
},
|
||||
@ -340,6 +371,246 @@ const inComing = new Schema({
|
||||
}
|
||||
});
|
||||
|
||||
//Схема валидации параметров SSL подключения к Kafka
|
||||
const kafkaSSL = new Schema({
|
||||
//Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить)
|
||||
bRejectUnauthorized: {
|
||||
type: Boolean,
|
||||
required: true,
|
||||
message: {
|
||||
type: path =>
|
||||
`Признак запрета использования самоподписанных сертификатов SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Boolean)`,
|
||||
required: path => `Не указан признак запрета использования самоподписанных сертификатов SSL подключения к Kafka (${path})`
|
||||
}
|
||||
},
|
||||
//Путь к корневому сертификату с информацией об удостоверяющем центре
|
||||
sPathCa: {
|
||||
type: String,
|
||||
required: false,
|
||||
message: {
|
||||
type: path => `Путь к корневому сертификату SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`
|
||||
}
|
||||
},
|
||||
//Путь к закрытому ключу
|
||||
sPathKey: {
|
||||
type: String,
|
||||
required: false,
|
||||
message: {
|
||||
type: path => `Путь к закрытому ключу SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`
|
||||
}
|
||||
},
|
||||
//Путь к сертификату
|
||||
sPathCert: {
|
||||
type: String,
|
||||
required: false,
|
||||
message: {
|
||||
type: path => `Путь к сертификату SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
//Схема валидации параметров подключения Kafka
|
||||
const kafka = new Schema({
|
||||
//Мнемокод сервиса обмена
|
||||
sService: {
|
||||
type: String,
|
||||
required: false,
|
||||
message: {
|
||||
type: path => `Мнемокод сервиса обмена подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`
|
||||
}
|
||||
},
|
||||
//ID клиента-отправителя
|
||||
sClientIdSender: {
|
||||
type: String,
|
||||
required: true,
|
||||
message: {
|
||||
type: path => `ID клиента-отправителя подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`,
|
||||
required: path => `Не указан ID клиента-отправителя подключения Kafka (${path})`
|
||||
}
|
||||
},
|
||||
//ID клиента-получателя
|
||||
sClientIdRecipient: {
|
||||
type: String,
|
||||
required: true,
|
||||
message: {
|
||||
type: path => `ID клиента-получателя подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`,
|
||||
required: path => `Не указан ID клиента-получателя подключения Kafka (${path})`
|
||||
}
|
||||
},
|
||||
//Группа получателя
|
||||
sGroupId: {
|
||||
type: String,
|
||||
required: true,
|
||||
message: {
|
||||
type: path => `Группа получателя подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`,
|
||||
required: path => `Не указана группа получателя подключения Kafka (${path})`
|
||||
}
|
||||
},
|
||||
//Время ожидания успешного подключения (мс)
|
||||
nConnectionTimeout: {
|
||||
type: Number,
|
||||
required: true,
|
||||
use: { validateTimeoutKafka },
|
||||
message: {
|
||||
type: path => `Время ожидания успешного подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Number)`,
|
||||
required: path => `Не указано время ожидания успешного подключения к Kafka (${path})`,
|
||||
validateTimeoutKafka: path => `Время ожидания успешного подключения Kafka (${path}) должно быть неотрицательным целым числом`
|
||||
}
|
||||
},
|
||||
//Необходимость попытки переподключения при потере соединения
|
||||
bRestartOnFailure: {
|
||||
type: Boolean,
|
||||
required: true,
|
||||
message: {
|
||||
type: path => `Признак необходимости попытки переподключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Boolean)`,
|
||||
required: path => `Не указан признак необходимости попытки переподключения к Kafka (${path})`
|
||||
}
|
||||
},
|
||||
//Время максимального ожидания между попытками переподключения (мс)
|
||||
nMaxRetryTime: {
|
||||
type: Number,
|
||||
required: true,
|
||||
use: { validateMaxRetryTimeKafka },
|
||||
message: {
|
||||
type: path =>
|
||||
`Время максимального ожидания между попытками переподключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Number)`,
|
||||
required: path => `Не указано время максимального ожидания между попытками переподключения к Kafka (${path})`,
|
||||
validateMaxRetryTimeKafka: path =>
|
||||
`Время максимального ожидания между попытками переподключения к Kafka (${path}) должно быть неотрицательным целым числом`
|
||||
}
|
||||
},
|
||||
//Время ожидания между попытками переподключения (мс)
|
||||
nInitialRetryTime: {
|
||||
type: Number,
|
||||
required: true,
|
||||
use: { validateInitialRetryTimeKafka },
|
||||
message: {
|
||||
type: path => `Время ожидания между попытками переподключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Number)`,
|
||||
required: path => `Не указано время ожидания между попытками переподключения к Kafka (${path})`,
|
||||
validateInitialRetryTimeKafka: path =>
|
||||
`Время ожидания между попытками переподключения к Kafka (${path}) должно быть неотрицательным целым числом`
|
||||
}
|
||||
},
|
||||
//Уровень протоколирования подключения
|
||||
sLogLevel: {
|
||||
type: String,
|
||||
enum: [SKAFKA_LOG_LEVEL_NOTHING, SKAFKA_LOG_LEVEL_ERROR, SKAFKA_LOG_LEVEL_WARN, SKAFKA_LOG_LEVEL_INFO],
|
||||
required: true,
|
||||
message: {
|
||||
type: path => `Уровень протоколирования подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`,
|
||||
enum: path => `Значение уровня протоколирования подключения к Kafka (${path}) не поддерживается`,
|
||||
required: path => `Не указан уровень протоколирования подключения к Kafka (${path})`
|
||||
}
|
||||
},
|
||||
//Использовать аутентификацию по SSL-сертификату
|
||||
bAuthSSL: {
|
||||
type: Boolean,
|
||||
required: true,
|
||||
message: {
|
||||
type: path => `Признак использования аутентификации по SSL к Kafka (${path}) имеет некорректный тип данных (ожидалось - Boolean)`,
|
||||
required: path => `Не указан признак использования аутентификации по SSL к Kafka (${path})`
|
||||
}
|
||||
},
|
||||
//Параметры аутентификации по SSL-сертификату
|
||||
ssl: {
|
||||
schema: kafkaSSL,
|
||||
required: true,
|
||||
message: {
|
||||
required: path => `Не указаны параметры аутентификации по SSL к Kafka (${path})`
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
//Описатель схемы валидации подключения к Kafka
|
||||
const defKafka = (bRequired, sName) => {
|
||||
return {
|
||||
type: Array,
|
||||
required: bRequired,
|
||||
each: kafka,
|
||||
message: {
|
||||
type: `Список подключений Kafka (${sName}) имеет некорректный тип данных (ожидалось - Array)`,
|
||||
required: `Не указан список подключений Kafka (${sName})`
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
//Схема валидации параметров подключения MQTT
|
||||
const mqtt = new Schema({
|
||||
//Мнемокод сервиса обмена
|
||||
sService: {
|
||||
type: String,
|
||||
required: false,
|
||||
message: {
|
||||
type: path => `Мнемокод сервиса обмена подключения MQTT (${path}) имеет некорректный тип данных (ожидалось - String)`
|
||||
}
|
||||
},
|
||||
//ID клиента-отправителя
|
||||
sClientIdSender: {
|
||||
type: String,
|
||||
required: true,
|
||||
message: {
|
||||
type: path => `ID клиента-отправителя подключения MQTT (${path}) имеет некорректный тип данных (ожидалось - String)`,
|
||||
required: path => `Не указан ID клиента-отправителя подключения MQTT (${path})`
|
||||
}
|
||||
},
|
||||
//ID клиента-получателя
|
||||
sClientIdRecipient: {
|
||||
type: String,
|
||||
required: true,
|
||||
message: {
|
||||
type: path => `ID клиента-получателя подключения MQTT (${path}) имеет некорректный тип данных (ожидалось - String)`,
|
||||
required: path => `Не указан ID клиента-получателя подключения MQTT (${path})`
|
||||
}
|
||||
},
|
||||
//Время ожидания успешного подключения (мс)
|
||||
nConnectTimeout: {
|
||||
type: Number,
|
||||
required: true,
|
||||
use: { validateConnectTimeoutMQTT },
|
||||
message: {
|
||||
type: path => `Время ожидания успешного подключения к MQTT (${path}) имеет некорректный тип данных (ожидалось - Number)`,
|
||||
required: path => `Не указано время ожидания успешного подключения к MQTT (${path})`,
|
||||
validateConnectTimeoutMQTT: path => `Время ожидания успешного подключения к MQTT (${path}) должно быть неотрицательным целым числом`
|
||||
}
|
||||
},
|
||||
//Время ожидания между попытками переподключения (мс)
|
||||
nReconnectPeriod: {
|
||||
type: Number,
|
||||
required: true,
|
||||
use: { validateReconnectPeriodMQTT },
|
||||
message: {
|
||||
type: path => `Время ожидания между попытками переподключения к MQTT (${path}) имеет некорректный тип данных (ожидалось - Number)`,
|
||||
required: path => `Не указано время ожидания между попытками переподключения к MQTT (${path})`,
|
||||
validateReconnectPeriodMQTT: path =>
|
||||
`Время ожидания между попытками переподключения к MQTT (${path}) должно быть неотрицательным целым числом`
|
||||
}
|
||||
},
|
||||
//Уровень протоколирования подключения
|
||||
sLogLevel: {
|
||||
type: String,
|
||||
enum: [SMQTT_LOG_LEVEL_NOTHING, SMQTT_LOG_LEVEL_ERROR, SMQTT_LOG_LEVEL_INFO],
|
||||
required: true,
|
||||
message: {
|
||||
type: path => `Уровень протоколирования подключения к MQTT (${path}) имеет некорректный тип данных (ожидалось - String)`,
|
||||
enum: path => `Значение уровня протоколирования подключения к MQTT (${path}) не поддерживается`,
|
||||
required: path => `Не указан уровень протоколирования подключения к MQTT (${path})`
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
//Описатель схемы валидации подключения к MQTT
|
||||
const defMQTT = (bRequired, sName) => {
|
||||
return {
|
||||
type: Array,
|
||||
required: bRequired,
|
||||
each: mqtt,
|
||||
message: {
|
||||
type: `Список подключений MQTT (${sName}) имеет некорректный тип данных (ожидалось - Array)`,
|
||||
required: `Не указан список подключений MQTT (${sName})`
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
//Схема валидации параметров отправки E-Mail уведомлений
|
||||
const mail = new Schema({
|
||||
//Адреc сервера SMTP
|
||||
@ -441,6 +712,22 @@ const config = new Schema({
|
||||
required: path => `Не указаны параметры обработки очереди входящих сообщений (${path})`
|
||||
}
|
||||
},
|
||||
//Параметры подключения к Kafka
|
||||
kafka: {
|
||||
schema: defKafka(true, "kafka"),
|
||||
required: true,
|
||||
message: {
|
||||
required: path => `Не указаны параметры подключения Kafka (${path})`
|
||||
}
|
||||
},
|
||||
//Параметры подключения к MQTT
|
||||
mqtt: {
|
||||
schema: defMQTT(true, "mqtt"),
|
||||
required: true,
|
||||
message: {
|
||||
required: path => `Не указаны параметры подключения MQTT (${path})`
|
||||
}
|
||||
},
|
||||
//Параметры отправки E-Mail уведомлений
|
||||
mail: {
|
||||
schema: mail,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user