Инициализация ветки обновления node с версии 14 на версию 22, а также используемых сервером библиотек. Добавление модуля работы с запросами, изменение работы при клонировании переменных, отказ от module-alias и lodash

This commit is contained in:
boa604 2026-01-15 14:59:20 +03:00
parent 62786dcc5a
commit 97cb8516c3
21 changed files with 1923 additions and 2461 deletions

View File

@ -7,6 +7,16 @@
// Тело модуля // Тело модуля
//------------ //------------
//Переменные окружения
/*
Только для СУБД Oracle.
1. NODE_ORACLE_DB_THIN_MODE - Установите значение "1", чтобы использовать режим "тонкого клиента" (Thin-режим)
при подключении к БД (не требует установки Oracle Client).
2. ORACLE_CLIENT_LIB_DIR - Путь к директории с библиотеками Oracle Client,
используется только для работы в режиме "толстого клиента" (Thick-режим), если значение не указано,
то будет использоваться значение из переменной операционной системы PATH.
*/
//Общие параметры //Общие параметры
let common = { let common = {
//Наименование сервера приложений //Наименование сервера приложений
@ -14,7 +24,7 @@ let common = {
//Версия сервера приложений //Версия сервера приложений
sVersion: "8.5.6.1", sVersion: "8.5.6.1",
//Релиз сервера приложений //Релиз сервера приложений
sRelease: "2025.11.06", sRelease: "2025.12.24",
//Таймаут останова сервера (мс) //Таймаут останова сервера (мс)
nTerminateTimeout: 60000, nTerminateTimeout: 60000,
//Контролировать версию Системы //Контролировать версию Системы
@ -94,6 +104,8 @@ const kafka = [
nMaxRetryTime: 20000, nMaxRetryTime: 20000,
//Время ожидания между попытками переподключения (мс) //Время ожидания между попытками переподключения (мс)
nInitialRetryTime: 10000, nInitialRetryTime: 10000,
//Уровень протоколирования подключения (NOTHING - ничего, ERROR - ошибки, WARN - предупреждения, INFO - общая информация)
sLogLevel: "NOTHING",
//Использовать аутентификацию по SSL-сертификату //Использовать аутентификацию по SSL-сертификату
bAuthSSL: false, bAuthSSL: false,
//Параметры аутентификации по SSL-сертификату //Параметры аутентификации по SSL-сертификату
@ -122,7 +134,9 @@ const mqtt = [
//Время ожидания успешного подключения (мс) //Время ожидания успешного подключения (мс)
nConnectTimeout: 5000, nConnectTimeout: 5000,
//Время ожидания между попытками переподключения (мс) //Время ожидания между попытками переподключения (мс)
nReconnectPeriod: 10000 nReconnectPeriod: 10000,
//Уровень протоколирования подключения (NOTHING - ничего, ERROR - ошибки, INFO - общая информация)
sLogLevel: "NOTHING"
} }
]; ];

View File

@ -8,9 +8,9 @@
//----------------- //-----------------
//Путь к модулям //Путь к модулям
exports.SMODULES_PATH_CORE = "@core"; //Модули ядра exports.SMODULES_PATH_CORE = "."; //Модули ядра
exports.SMODULES_PATH_MODULES = "@modules"; //Дополнительные пользовательские модули exports.SMODULES_PATH_MODULES = "../modules"; //Дополнительные пользовательские модули
exports.SMODULES_PATH_MODELS = "@models"; //Модели данных и схемы валидации exports.SMODULES_PATH_MODELS = "../models"; //Модели данных и схемы валидации
//Типовые коды ошибок //Типовые коды ошибок
exports.SERR_COMMON = "ERR_COMMON"; //Общая ошибка exports.SERR_COMMON = "ERR_COMMON"; //Общая ошибка
@ -38,6 +38,16 @@ exports.SERR_APP_SERVER_BEFORE = "ERR_APP_SERVER_BEFORE"; //Ошибка пре
exports.SERR_APP_SERVER_AFTER = "ERR_APP_SERVER_AFTER"; //Ошибка постобработчика exports.SERR_APP_SERVER_AFTER = "ERR_APP_SERVER_AFTER"; //Ошибка постобработчика
exports.SERR_DB_SERVER = "ERR_DB_SERVER"; //Ошибка обработчика сервера БД exports.SERR_DB_SERVER = "ERR_DB_SERVER"; //Ошибка обработчика сервера БД
//Типовые коды ошибок брокера сообщений Kafka
exports.SERR_KAFKA_GROUP_UNAVAILABLE = "ERR_KAFKA_GROUP_UNAVAILABLE"; //Группа получателя недоступна
exports.SERR_KAFKA = "ERR_KAFKA"; //Ошибка
exports.SWARN_KAFKA = "WARN_KAFKA"; //Предупреждение
exports.SINFO_KAFKA = "INFO_KAFKA"; //Информация
//Типовые коды MQTT
exports.SERR_MQTT = "ERR_MQTT"; //Ошибка
exports.SINFO_MQTT = "INFO_MQTT"; //Предупреждение
//Шаблоны подсветки консольных сообщений протокола работы //Шаблоны подсветки консольных сообщений протокола работы
exports.SCONSOLE_LOG_COLOR_PATTERN_ERR = "\x1b[31m%s\x1b[0m%s"; //Цвет для ошибок exports.SCONSOLE_LOG_COLOR_PATTERN_ERR = "\x1b[31m%s\x1b[0m%s"; //Цвет для ошибок
exports.SCONSOLE_LOG_COLOR_PATTERN_WRN = "\x1b[33m%s\x1b[0m%s"; //Цвет для предупреждений exports.SCONSOLE_LOG_COLOR_PATTERN_WRN = "\x1b[33m%s\x1b[0m%s"; //Цвет для предупреждений

View File

@ -7,10 +7,9 @@
// Подключение библиотек // Подключение библиотек
//---------------------- //----------------------
const _ = require("lodash"); //Работа с массивами и объектами
const EventEmitter = require("events"); //Обработчик пользовательских событий const EventEmitter = require("events"); //Обработчик пользовательских событий
const { ServerError } = require("./server_errors"); //Типовая ошибка const { ServerError } = require("./server_errors"); //Типовая ошибка
const { makeModuleFullPath, validateObject } = require("./utils"); //Вспомогательные функции const { makeModuleFullPath, validateObject, deepClone } = require("./utils"); //Вспомогательные функции
const prmsDBConnectorSchema = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля const prmsDBConnectorSchema = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля
const intfDBConnectorModuleSchema = require("../models/intf_db_connector_module"); //Схема валидации интерфейса модуля взаимодействия с БД const intfDBConnectorModuleSchema = require("../models/intf_db_connector_module"); //Схема валидации интерфейса модуля взаимодействия с БД
const objServiceSchema = require("../models/obj_service"); //Схема валидации сервиса const objServiceSchema = require("../models/obj_service"); //Схема валидации сервиса
@ -69,7 +68,7 @@ class DBConnector extends EventEmitter {
throw new ServerError(SERR_MODULES_BAD_INTERFACE, sCheckResult); throw new ServerError(SERR_MODULES_BAD_INTERFACE, sCheckResult);
} }
//Всё успешно - сохраним настройки подключения //Всё успешно - сохраним настройки подключения
this.connectSettings = _.cloneDeep(prms.connectSettings); this.connectSettings = deepClone(prms.connectSettings);
//Инициализируем остальные свойства //Инициализируем остальные свойства
this.connection = null; this.connection = null;
this.bConnected = false; this.bConnected = false;
@ -176,7 +175,7 @@ class DBConnector extends EventEmitter {
//Забираем для каждого из сервисов список его функций //Забираем для каждого из сервисов список его функций
let srvsFuncs = srvs.map(async srv => { let srvsFuncs = srvs.map(async srv => {
const response = await this.getServiceFunctions({ nServiceId: srv.nId }); const response = await this.getServiceFunctions({ nServiceId: srv.nId });
let tmp = _.cloneDeep(srv); let tmp = deepClone(srv);
response.forEach(f => { response.forEach(f => {
tmp.functions.push(f); tmp.functions.push(f);
}); });
@ -211,7 +210,7 @@ class DBConnector extends EventEmitter {
if (!sCheckResult) { if (!sCheckResult) {
try { try {
//Подготовим параметры для передачи в БД //Подготовим параметры для передачи в БД
let getServiceFunctionsData = _.cloneDeep(prms); let getServiceFunctionsData = deepClone(prms);
getServiceFunctionsData.connection = this.connection; getServiceFunctionsData.connection = this.connection;
//И выполним считывание функций сервиса //И выполним считывание функций сервиса
let res = await this.connector.getServiceFunctions(getServiceFunctionsData); let res = await this.connector.getServiceFunctions(getServiceFunctionsData);
@ -240,7 +239,7 @@ class DBConnector extends EventEmitter {
if (!sCheckResult) { if (!sCheckResult) {
try { try {
//Подготовим параметры для передачи в БД //Подготовим параметры для передачи в БД
let getServiceContextData = _.cloneDeep(prms); let getServiceContextData = deepClone(prms);
getServiceContextData.connection = this.connection; getServiceContextData.connection = this.connection;
//И выполним считывание контекста сервиса //И выполним считывание контекста сервиса
let res = await this.connector.getServiceContext(getServiceContextData); let res = await this.connector.getServiceContext(getServiceContextData);
@ -269,7 +268,7 @@ class DBConnector extends EventEmitter {
if (!sCheckResult) { if (!sCheckResult) {
try { try {
//Подготовим параметры для передачи в БД //Подготовим параметры для передачи в БД
let setServiceContextData = _.cloneDeep(prms); let setServiceContextData = deepClone(prms);
setServiceContextData.connection = this.connection; setServiceContextData.connection = this.connection;
//И выполним установку контекста сервиса //И выполним установку контекста сервиса
await this.connector.setServiceContext(setServiceContextData); await this.connector.setServiceContext(setServiceContextData);
@ -295,7 +294,7 @@ class DBConnector extends EventEmitter {
if (!sCheckResult) { if (!sCheckResult) {
try { try {
//Подготовим параметры для передачи в БД //Подготовим параметры для передачи в БД
let clearServiceContextData = _.cloneDeep(prms); let clearServiceContextData = deepClone(prms);
clearServiceContextData.connection = this.connection; clearServiceContextData.connection = this.connection;
//И выполним очистку контекста сервиса //И выполним очистку контекста сервиса
await this.connector.clearServiceContext(clearServiceContextData); await this.connector.clearServiceContext(clearServiceContextData);
@ -321,7 +320,7 @@ class DBConnector extends EventEmitter {
if (!sCheckResult) { if (!sCheckResult) {
try { try {
//Подготовим параметры для передачи в БД //Подготовим параметры для передачи в БД
let isServiceAuthData = _.cloneDeep(prms); let isServiceAuthData = deepClone(prms);
isServiceAuthData.connection = this.connection; isServiceAuthData.connection = this.connection;
//И выполним проверку атентифицированности сервиса //И выполним проверку атентифицированности сервиса
let res = await this.connector.isServiceAuth(isServiceAuthData); let res = await this.connector.isServiceAuth(isServiceAuthData);
@ -354,7 +353,7 @@ class DBConnector extends EventEmitter {
if (!sCheckResult) { if (!sCheckResult) {
try { try {
//Подготовим параметры для передачи в БД //Подготовим параметры для передачи в БД
let putServiceAuthInQueueData = _.cloneDeep(prms); let putServiceAuthInQueueData = deepClone(prms);
putServiceAuthInQueueData.connection = this.connection; putServiceAuthInQueueData.connection = this.connection;
//И выполним постановку в очередь задания на аутентификацию сервиса //И выполним постановку в очередь задания на аутентификацию сервиса
await this.connector.putServiceAuthInQueue(putServiceAuthInQueueData); await this.connector.putServiceAuthInQueue(putServiceAuthInQueueData);
@ -384,7 +383,7 @@ class DBConnector extends EventEmitter {
if (!sCheckResult) { if (!sCheckResult) {
try { try {
//Подготовим параметры для передачи в БД //Подготовим параметры для передачи в БД
let getServiceExpiredQueueInfoData = _.cloneDeep(prms); let getServiceExpiredQueueInfoData = deepClone(prms);
getServiceExpiredQueueInfoData.connection = this.connection; getServiceExpiredQueueInfoData.connection = this.connection;
//И выполним получение информации о просроченных сообщениях //И выполним получение информации о просроченных сообщениях
let res = await this.connector.getServiceExpiredQueueInfo(getServiceExpiredQueueInfoData); let res = await this.connector.getServiceExpiredQueueInfo(getServiceExpiredQueueInfoData);
@ -413,7 +412,7 @@ class DBConnector extends EventEmitter {
if (!sCheckResult) { if (!sCheckResult) {
try { try {
//Подготовим параметры для передачи в БД //Подготовим параметры для передачи в БД
let logData = _.cloneDeep(prms); let logData = deepClone(prms);
logData.connection = this.connection; logData.connection = this.connection;
logData.nExsSrv = this.nExsSrv; logData.nExsSrv = this.nExsSrv;
//И выполним запись в журнал //И выполним запись в журнал
@ -436,7 +435,7 @@ class DBConnector extends EventEmitter {
//Запись информации в журнал работы //Запись информации в журнал работы
async putLogInf(sMsg, prms) { async putLogInf(sMsg, prms) {
//Подготовим параметры для передачи в БД //Подготовим параметры для передачи в БД
let logData = _.cloneDeep(prms); let logData = deepClone(prms);
//Выставим сообщение и тип записи журнала //Выставим сообщение и тип записи журнала
logData.nLogState = objLogSchema.NLOG_STATE_INF; logData.nLogState = objLogSchema.NLOG_STATE_INF;
logData.sMsg = sMsg; logData.sMsg = sMsg;
@ -452,7 +451,7 @@ class DBConnector extends EventEmitter {
//Запись предупреждения в журнал работы //Запись предупреждения в журнал работы
async putLogWrn(sMsg, prms) { async putLogWrn(sMsg, prms) {
//Подготовим параметры для передачи в БД //Подготовим параметры для передачи в БД
let logData = _.cloneDeep(prms); let logData = deepClone(prms);
//Выставим сообщение и тип записи журнала //Выставим сообщение и тип записи журнала
logData.nLogState = objLogSchema.NLOG_STATE_WRN; logData.nLogState = objLogSchema.NLOG_STATE_WRN;
logData.sMsg = sMsg; logData.sMsg = sMsg;
@ -468,7 +467,7 @@ class DBConnector extends EventEmitter {
//Запись ошибки в журнал работы //Запись ошибки в журнал работы
async putLogErr(sMsg, prms) { async putLogErr(sMsg, prms) {
//Подготовим параметры для передачи в БД //Подготовим параметры для передачи в БД
let logData = _.cloneDeep(prms); let logData = deepClone(prms);
//Выставим сообщение и тип записи журнала //Выставим сообщение и тип записи журнала
logData.nLogState = objLogSchema.NLOG_STATE_ERR; logData.nLogState = objLogSchema.NLOG_STATE_ERR;
logData.sMsg = sMsg; logData.sMsg = sMsg;
@ -489,7 +488,7 @@ class DBConnector extends EventEmitter {
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
//Подготовим параметры //Подготовим параметры
let getQueueData = _.cloneDeep(prms); let getQueueData = deepClone(prms);
getQueueData.connection = this.connection; getQueueData.connection = this.connection;
try { try {
//Исполняем действие в БД //Исполняем действие в БД
@ -518,7 +517,7 @@ class DBConnector extends EventEmitter {
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
//Подготовим параметры //Подготовим параметры
let putQueueData = _.cloneDeep(prms); let putQueueData = deepClone(prms);
putQueueData.blMsg = prms.blMsg ? prms.blMsg : Buffer.from(""); putQueueData.blMsg = prms.blMsg ? prms.blMsg : Buffer.from("");
putQueueData.connection = this.connection; putQueueData.connection = this.connection;
//Исполняем действие в БД //Исполняем действие в БД
@ -549,7 +548,7 @@ class DBConnector extends EventEmitter {
if (!sCheckResult) { if (!sCheckResult) {
try { try {
//Подготовим параметры для передачи в БД //Подготовим параметры для передачи в БД
let getQueueOutgoingData = _.cloneDeep(prms); let getQueueOutgoingData = deepClone(prms);
getQueueOutgoingData.connection = this.connection; getQueueOutgoingData.connection = this.connection;
getQueueOutgoingData.nExsSrv = this.nExsSrv; getQueueOutgoingData.nExsSrv = this.nExsSrv;
//Выполняем считывание из БД //Выполняем считывание из БД
@ -577,7 +576,7 @@ class DBConnector extends EventEmitter {
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
//Подготовим параметры //Подготовим параметры
let setStateData = _.cloneDeep(prms); let setStateData = deepClone(prms);
setStateData.connection = this.connection; setStateData.connection = this.connection;
try { try {
//Исполняем действие в БД //Исполняем действие в БД
@ -610,7 +609,7 @@ class DBConnector extends EventEmitter {
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
//Подготовим параметры //Подготовим параметры
let getQueueMsgData = _.cloneDeep(prms); let getQueueMsgData = deepClone(prms);
getQueueMsgData.connection = this.connection; getQueueMsgData.connection = this.connection;
//Исполняем действие в БД //Исполняем действие в БД
try { try {
@ -643,7 +642,7 @@ class DBConnector extends EventEmitter {
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
//Подготовим параметры //Подготовим параметры
let setQueueMsgData = _.cloneDeep(prms); let setQueueMsgData = deepClone(prms);
if (!setQueueMsgData.blMsg) setQueueMsgData.blMsg = Buffer.from(""); if (!setQueueMsgData.blMsg) setQueueMsgData.blMsg = Buffer.from("");
setQueueMsgData.connection = this.connection; setQueueMsgData.connection = this.connection;
//Исполняем действие в БД //Исполняем действие в БД
@ -677,7 +676,7 @@ class DBConnector extends EventEmitter {
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
//Подготовим параметры //Подготовим параметры
let setQueueOptionsData = _.cloneDeep(prms); let setQueueOptionsData = deepClone(prms);
setQueueOptionsData.connection = this.connection; setQueueOptionsData.connection = this.connection;
//Исполняем действие в БД //Исполняем действие в БД
try { try {
@ -710,7 +709,7 @@ class DBConnector extends EventEmitter {
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
//Подготовим параметры //Подготовим параметры
let getQueueRespData = _.cloneDeep(prms); let getQueueRespData = deepClone(prms);
getQueueRespData.connection = this.connection; getQueueRespData.connection = this.connection;
//Исполняем действие в БД //Исполняем действие в БД
try { try {
@ -743,7 +742,7 @@ class DBConnector extends EventEmitter {
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
//Подготовим параметры //Подготовим параметры
let setQueueRespData = _.cloneDeep(prms); let setQueueRespData = deepClone(prms);
if (!setQueueRespData.blResp) setQueueRespData.blResp = Buffer.from(""); if (!setQueueRespData.blResp) setQueueRespData.blResp = Buffer.from("");
setQueueRespData.connection = this.connection; setQueueRespData.connection = this.connection;
//Исполняем действие в БД //Исполняем действие в БД
@ -777,7 +776,7 @@ class DBConnector extends EventEmitter {
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
//Подготовим параметры //Подготовим параметры
let setQueueOptionsRespData = _.cloneDeep(prms); let setQueueOptionsRespData = deepClone(prms);
setQueueOptionsRespData.connection = this.connection; setQueueOptionsRespData.connection = this.connection;
//Исполняем действие в БД //Исполняем действие в БД
try { try {
@ -835,7 +834,7 @@ class DBConnector extends EventEmitter {
//Исполняем действие в БД //Исполняем действие в БД
try { try {
//Подготовим параметры для передачи в БД //Подготовим параметры для передачи в БД
let execQueuePrcData = _.cloneDeep(prms); let execQueuePrcData = deepClone(prms);
execQueuePrcData.connection = this.connection; execQueuePrcData.connection = this.connection;
//И выполним обработчик со стороны БД //И выполним обработчик со стороны БД
let res = await this.connector.execQueuePrc(execQueuePrcData); let res = await this.connector.execQueuePrc(execQueuePrcData);

236
core/http_client.js Normal file
View File

@ -0,0 +1,236 @@
/*
Сервис интеграции ПП Парус 8 с WEB API
Модуль ядра: работа с HTTP/HTTPS запросами
*/
//------------------------------
// Подключение внешних библиотек
//------------------------------
const { URL } = require("url");
//--------------------------
// Локальные идентификаторы
//--------------------------
const DEFAULT_TIMEOUT = 30000; //Таймаут по умолчанию
//Выполнение HTTP/HTTPS запроса
const httpRequest = async (rawOptions = {}) => {
try {
//Нормализуем параметры запроса
const options = normalizeOptions(rawOptions);
//Сформируем ссылку
const url = buildURL(options.url, options.query);
//Подготовим заголовки и тело запроса
const headers = prepareHeaders(options.headers);
const { body, contentLength, isStream } = prepareBody({
body: options.body,
json: options.json,
headers
});
//Если не указан размер тела
if (contentLength !== undefined && !headers.has("content-length")) {
//Установим размер тела в заголовок
headers.set("content-length", String(contentLength));
}
//Выполним запрос с использованием fetch
return httpRequestFetch({ options, url, headers, body, isStream });
} catch (e) {
throw e;
}
};
//Ошибка HTTP-запроса
class HttpError extends Error {
constructor(message, response) {
super(message);
this.name = "HttpError";
this.response = response;
}
}
//Выполнение запроса с использованием fetch
const httpRequestFetch = async ({ options, url, headers, body, isStream }) => {
try {
const fetchImpl = globalThis.fetch;
if (typeof fetchImpl !== "function") {
throw new Error("globalThis.fetch недоступен. Используйте Node.js >= 18.");
}
let requestBody = body;
if (isStream && body && typeof body.pipe === "function") {
const chunks = [];
try {
for await (const chunk of body) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
requestBody = chunks.length > 0 ? Buffer.concat(chunks) : undefined;
} catch (streamError) {
throw new Error(`Ошибка чтения потока: ${streamError.message}`);
}
}
const timeoutController = new AbortController();
const signals = [];
if (options.signal && options.signal instanceof AbortSignal) {
signals.push(options.signal);
}
signals.push(timeoutController.signal);
const combinedSignal = AbortSignal.any(signals);
const timeoutId =
options.timeout > 0
? setTimeout(
() => timeoutController.abort(new Error(`Время ожидания выполнения запроса истекло после ${options.timeout} мс`)),
options.timeout
)
: null;
try {
const fetchOptions = {
method: options.method,
headers: Object.fromEntries(headers),
body: requestBody,
signal: combinedSignal,
redirect: options.followRedirects ? "follow" : "manual"
};
if (options.proxy || options.ca || options.cert || options.key || options.passphrase || options.rejectUnauthorized === false) {
const { Agent, ProxyAgent } = require("undici");
const connectOptions = {
rejectUnauthorized: options.rejectUnauthorized !== undefined ? options.rejectUnauthorized : true
};
if (options.ca) {
connectOptions.ca = Array.isArray(options.ca) ? options.ca : [options.ca];
}
if (options.cert) {
connectOptions.cert = options.cert;
}
if (options.key) {
connectOptions.key = options.key;
}
if (options.passphrase) {
connectOptions.passphrase = options.passphrase;
}
if (options.proxy) {
const proxyUrl = new URL(options.proxy);
if (proxyUrl.protocol !== "http:" && proxyUrl.protocol !== "https:") {
throw new Error("Поддерживаются только HTTP/HTTPS-прокси (protocol=http|https).");
}
const proxyAuth =
proxyUrl.username || proxyUrl.password
? `Basic ${Buffer.from(`${decodeURIComponent(proxyUrl.username)}:${decodeURIComponent(proxyUrl.password)}`).toString(
"base64"
)}`
: null;
fetchOptions.dispatcher = new ProxyAgent({
uri: options.proxy,
token: proxyAuth,
connect: connectOptions
});
} else {
fetchOptions.dispatcher = new Agent({
connect: connectOptions
});
}
}
const response = await fetchImpl(url.toString(), fetchOptions);
const responseBody = Buffer.from(await response.arrayBuffer());
const result = {
statusCode: response.status,
headers: Object.fromEntries(response.headers.entries()),
body: responseBody,
ok: response.ok,
url: response.url
};
if (options.throwOnErrorStatus && !result.ok) {
throw new HttpError(`Запрос не выполнен со статусом ${result.statusCode}`, result);
}
return result;
} finally {
if (timeoutId) clearTimeout(timeoutId);
}
} catch (e) {
throw e;
}
};
//Нормализация параметров запроса
const normalizeOptions = options => {
if (!options || typeof options !== "object") {
throw new TypeError("options должен быть объектом");
}
if (!options.url) throw new Error("options.url обязателен");
return {
method: (options.method || "GET").toUpperCase(),
url: options.url,
headers: options.headers || {},
query: options.query || options.qs || {},
body: options.body,
json: options.json,
timeout: options.timeout ?? DEFAULT_TIMEOUT,
followRedirects: options.followRedirects ?? false,
proxy: options.proxy || null,
ca: options.ca,
cert: options.cert,
key: options.key,
passphrase: options.passphrase,
rejectUnauthorized: options.rejectUnauthorized !== undefined ? options.rejectUnauthorized : true,
throwOnErrorStatus: options.throwOnErrorStatus ?? false,
signal: options.signal || null
};
};
//Формирование ссылки
const buildURL = (base, query = {}) => {
const url = new URL(base);
Object.entries(query).forEach(([key, value]) => {
if (value === undefined || value === null) return;
url.searchParams.append(key, String(value));
});
return url;
};
//Подготовка заголовков запроса
const prepareHeaders = (inputHeaders = {}) => {
const headers = new Map();
Object.entries(inputHeaders).forEach(([key, value]) => {
if (value === undefined || value === null) return;
headers.set(String(key).toLowerCase(), String(value));
});
return headers;
};
//Подготовка тела запроса
const prepareBody = ({ body, json, headers }) => {
if (json !== undefined) {
const payload = Buffer.from(JSON.stringify(json));
if (!headers.has("content-type")) headers.set("content-type", "application/json; charset=utf-8");
return { body: payload, contentLength: payload.length, isStream: false };
}
if (body === undefined || body === null) return { body: undefined, contentLength: undefined, isStream: false };
if (Buffer.isBuffer(body) || body instanceof Uint8Array) {
return { body, contentLength: body.length, isStream: false };
}
if (typeof body === "string") {
const payload = Buffer.from(body);
return { body: payload, contentLength: payload.length, isStream: false };
}
if (typeof body.pipe === "function") {
return { body, contentLength: undefined, isStream: true };
}
const payload = Buffer.from(JSON.stringify(body));
if (!headers.has("content-type")) headers.set("content-type", "application/json; charset=utf-8");
return { body: payload, contentLength: payload.length, isStream: false };
};
exports.httpRequest = httpRequest;
exports.HttpError = HttpError;

View File

@ -7,11 +7,9 @@
// Подключение внешних библиотек // Подключение внешних библиотек
//------------------------------ //------------------------------
const _ = require("lodash"); //Работа с массивами и коллекциями
const EventEmitter = require("events"); //Обработчик пользовательских событий const EventEmitter = require("events"); //Обработчик пользовательских событий
const express = require("express"); //WEB-сервер Express const express = require("express"); //WEB-сервер Express
const cors = require("cors"); //Управление заголовками безопасности для WEB-сервера Express const cors = require("cors"); //Управление заголовками безопасности для WEB-сервера Express
const bodyParser = require("body-parser"); //Модуль для Express (разбор тела входящего запроса)
const { ServerError } = require("./server_errors"); //Типовая ошибка const { ServerError } = require("./server_errors"); //Типовая ошибка
const { const {
makeErrorText, makeErrorText,
@ -23,6 +21,7 @@ const {
deepMerge, deepMerge,
deepCopyObject, deepCopyObject,
isUndefined, isUndefined,
deepClone,
getKafkaConnectionSettings, getKafkaConnectionSettings,
getMQTTConnectionSettings, getMQTTConnectionSettings,
getURLProtocol getURLProtocol
@ -67,13 +66,13 @@ class InQueue extends EventEmitter {
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
//Общие параметры сервера приложений //Общие параметры сервера приложений
this.common = _.cloneDeep(prms.common); this.common = deepClone(prms.common);
//Список обслуживаемых сервисов //Список обслуживаемых сервисов
this.services = null; this.services = null;
//Признак функционирования обработчика //Признак функционирования обработчика
this.bWorking = false; this.bWorking = false;
//Параметры очереди //Параметры очереди
this.inComing = _.cloneDeep(prms.inComing); this.inComing = deepClone(prms.inComing);
//Запомним подключение к БД //Запомним подключение к БД
this.dbConn = prms.dbConn; this.dbConn = prms.dbConn;
//Запомним логгер //Запомним логгер
@ -82,8 +81,8 @@ class InQueue extends EventEmitter {
this.notifier = prms.notifier; this.notifier = prms.notifier;
//WEB-приложение //WEB-приложение
this.webApp = express(); this.webApp = express();
// Глобально разрешаем CORS для всех маршрутов и методов.
this.webApp.use(cors()); this.webApp.use(cors());
this.webApp.options("*", cors());
//WEB-сервер //WEB-сервер
this.srv = null; this.srv = null;
//Параметры подключения к Kafka //Параметры подключения к Kafka
@ -130,16 +129,16 @@ class InQueue extends EventEmitter {
prms.function.nFnPrmsType prms.function.nFnPrmsType
) )
) { ) {
blMsg = prms.req.body && !_.isEmpty(prms.req.body) ? prms.req.body : null; blMsg = prms.req.body && !(Object.keys(prms.req.body ?? {}).length === 0) ? Buffer.from(JSON.stringify(prms.req.body)) : null;
} else { } else {
//Для GET, HEAD, DELETE, CONNECT, OPTIONS и TRACE - параметры запроса //Для GET, HEAD, DELETE, CONNECT, OPTIONS и TRACE - параметры запроса
if (!_.isEmpty(prms.req.query)) blMsg = Buffer.from(JSON.stringify(prms.req.query)); if (!(Object.keys(prms.req.query ?? {}).length === 0)) blMsg = Buffer.from(JSON.stringify(prms.req.query));
} }
//Определимся с параметрами сообщения полученными от внешней системы //Определимся с параметрами сообщения полученными от внешней системы
options = { options = {
method: prms.req.method, method: prms.req.method,
qs: _.cloneDeep(prms.req.query), qs: deepClone(prms.req.query),
headers: _.cloneDeep(prms.req.headers), headers: deepClone(prms.req.headers),
ip: prms.req.ip, ip: prms.req.ip,
hostName: prms.req.hostname, hostName: prms.req.hostname,
protocol: prms.req.protocol, protocol: prms.req.protocol,
@ -152,8 +151,6 @@ class InQueue extends EventEmitter {
sOptions: buildOptionsXML({ options }), sOptions: buildOptionsXML({ options }),
blMsg blMsg
}); });
//Запомним идентификатор записи очереди в запросе
prms.req.nQId = q.nId;
//Скажем что пришло новое входящее сообщение //Скажем что пришло новое входящее сообщение
await this.logger.info( await this.logger.info(
`Новое входящее сообщение от ${prms.req.connection.address().address} для функции ${prms.function.sCode} (${buildURL({ `Новое входящее сообщение от ${prms.req.connection.address().address} для функции ${prms.function.sCode} (${buildURL({
@ -173,11 +170,11 @@ class InQueue extends EventEmitter {
const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore); const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore);
let resBefore = null; let resBefore = null;
try { try {
let resBeforePrms = _.cloneDeep(prms); let resBeforePrms = deepClone(prms);
resBeforePrms.queue = _.cloneDeep(q); resBeforePrms.queue = deepClone(q);
resBeforePrms.queue.blMsg = blMsg; resBeforePrms.queue.blMsg = blMsg;
resBeforePrms.queue.blResp = blResp; resBeforePrms.queue.blResp = blResp;
resBeforePrms.options = _.cloneDeep(options); resBeforePrms.options = deepClone(options);
resBeforePrms.dbConn = this.dbConn; resBeforePrms.dbConn = this.dbConn;
resBeforePrms.notifier = this.notifier; resBeforePrms.notifier = this.notifier;
resBeforePrms.res = prms.res; resBeforePrms.res = prms.res;
@ -200,7 +197,7 @@ class InQueue extends EventEmitter {
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
}); });
//Фиксируем результат исполнения "До" - обработанный запрос внешней системы //Фиксируем результат исполнения "До" - обработанный запрос внешней системы
if (!_.isUndefined(resBefore.blMsg)) { if (resBefore.blMsg !== undefined) {
blMsg = resBefore.blMsg; blMsg = resBefore.blMsg;
q = await this.dbConn.setQueueMsg({ q = await this.dbConn.setQueueMsg({
nQueueId: q.nId, nQueueId: q.nId,
@ -208,7 +205,7 @@ class InQueue extends EventEmitter {
}); });
} }
//Фиксируем результат исполнения "До" - ответ на запрос //Фиксируем результат исполнения "До" - ответ на запрос
if (!_.isUndefined(resBefore.blResp)) { if (resBefore.blResp !== undefined) {
blResp = resBefore.blResp; blResp = resBefore.blResp;
q = await this.dbConn.setQueueResp({ q = await this.dbConn.setQueueResp({
nQueueId: q.nId, nQueueId: q.nId,
@ -217,16 +214,16 @@ class InQueue extends EventEmitter {
}); });
} }
//Фиксируем результат исполнения "До" - параметры ответа на запрос //Фиксируем результат исполнения "До" - параметры ответа на запрос
if (!_.isUndefined(resBefore.optionsResp)) { if (resBefore.optionsResp !== undefined) {
optionsResp = deepMerge(optionsResp, resBefore.optionsResp); optionsResp = deepMerge(optionsResp, resBefore.optionsResp);
let sOptionsResp = buildOptionsXML({ options: optionsResp }); let sOptionsResp = buildOptionsXML({ options: optionsResp });
q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp }); q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp });
} }
//Фиксируем результат исполнения "До" - флаг ошибочной аутентификации - если он поднят, то это ошибка, дальше ничего не делаем //Фиксируем результат исполнения "До" - флаг ошибочной аутентификации - если он поднят, то это ошибка, дальше ничего не делаем
if (!_.isUndefined(resBefore.bUnAuth) && resBefore.bUnAuth === true) if (resBefore.bUnAuth !== undefined && resBefore.bUnAuth === true)
throw new ServerError(SERR_UNAUTH, "Нет аутентификации"); throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
//Фиксируем результат исполнения "До" - флаг прекращения дальнейшей обработки сообщения - если он поднят, то дальше обработку сообщения прекращаем //Фиксируем результат исполнения "До" - флаг прекращения дальнейшей обработки сообщения - если он поднят, то дальше обработку сообщения прекращаем
if (!_.isUndefined(resBefore.bStopPropagation) && resBefore.bStopPropagation === true) bStopPropagation = true; if (resBefore.bStopPropagation !== undefined && resBefore.bStopPropagation === true) bStopPropagation = true;
} else { } else {
//Или расскажем об ошибке //Или расскажем об ошибке
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
@ -281,12 +278,12 @@ class InQueue extends EventEmitter {
const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter); const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter);
let resAfter = null; let resAfter = null;
try { try {
let resAfterPrms = _.cloneDeep(prms); let resAfterPrms = deepClone(prms);
resAfterPrms.queue = _.cloneDeep(q); resAfterPrms.queue = deepClone(q);
resAfterPrms.queue.blMsg = blMsg; resAfterPrms.queue.blMsg = blMsg;
resAfterPrms.queue.blResp = blResp; resAfterPrms.queue.blResp = blResp;
resAfterPrms.options = _.cloneDeep(options); resAfterPrms.options = deepClone(options);
resAfterPrms.optionsResp = _.cloneDeep(optionsResp); resAfterPrms.optionsResp = deepClone(optionsResp);
resAfterPrms.dbConn = this.dbConn; resAfterPrms.dbConn = this.dbConn;
resAfterPrms.notifier = this.notifier; resAfterPrms.notifier = this.notifier;
resAfter = await fnAfter(resAfterPrms); resAfter = await fnAfter(resAfterPrms);
@ -308,7 +305,7 @@ class InQueue extends EventEmitter {
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
}); });
//Фиксируем результат исполнения "После" - ответ системы //Фиксируем результат исполнения "После" - ответ системы
if (!_.isUndefined(resAfter.blResp)) { if (resAfter.blResp !== undefined) {
blResp = resAfter.blResp; blResp = resAfter.blResp;
q = await this.dbConn.setQueueResp({ q = await this.dbConn.setQueueResp({
nQueueId: q.nId, nQueueId: q.nId,
@ -317,13 +314,13 @@ class InQueue extends EventEmitter {
}); });
} }
//Фиксируем результат исполнения "После" - параметры ответа на запрос //Фиксируем результат исполнения "После" - параметры ответа на запрос
if (!_.isUndefined(resAfter.optionsResp)) { if (resAfter.optionsResp !== undefined) {
optionsResp = deepMerge(optionsResp, resAfter.optionsResp); optionsResp = deepMerge(optionsResp, resAfter.optionsResp);
let sOptionsResp = buildOptionsXML({ options: optionsResp }); let sOptionsResp = buildOptionsXML({ options: optionsResp });
q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp }); q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp });
} }
//Если пришел флаг ошибочной аутентификации и он положительный - то это ошибка, дальше ничего не делаем //Если пришел флаг ошибочной аутентификации и он положительный - то это ошибка, дальше ничего не делаем
if (!_.isUndefined(resAfter.bUnAuth)) if (resAfter.bUnAuth !== undefined)
if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации"); if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
} else { } else {
//Или расскажем об ошибке //Или расскажем об ошибке
@ -331,19 +328,13 @@ class InQueue extends EventEmitter {
} }
} }
} }
//Всё успешно - отдаём результат клиенту, если ещё не отдали //Если мы еще не отдали ответ от сервера
if (bStopPropagation === false && !prms.res.writableFinished) { if (!prms.res.writableFinished) {
//Всё успешно - отдаём результат клиенту
if (bStopPropagation === false) {
if (optionsResp.headers) prms.res.set(optionsResp.headers); if (optionsResp.headers) prms.res.set(optionsResp.headers);
prms.res.status(optionsResp.statusCode || 200).send(blResp); prms.res.status(optionsResp.statusCode || 200).send(blResp);
} }
//Если отправка ответа была прервана по таймауту
if (prms.req.bIsTimedOut === true) {
//Вернем ошибку обработчика с информацией об этом
throw new ServerError(
SERR_WEB_SERVER,
"Истекло время ожидания обработки входящего запроса. Канал закрыт. Клиенту был отправлен ответ с ошибкой истечения таймаута (504)."
);
} else {
//Фиксируем успех обработки - в протоколе работы сервиса //Фиксируем успех обработки - в протоколе работы сервиса
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId }); await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
//Фиксируем успех обработки - в статусе сообщения //Фиксируем успех обработки - в статусе сообщения
@ -352,6 +343,12 @@ class InQueue extends EventEmitter {
nIncExecCnt: NINC_EXEC_CNT_YES, nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
}); });
} else {
//Или расскажем об ошибке
throw new ServerError(
SERR_WEB_SERVER,
"Истекло время ожидания обработки входящего запроса. Канал закрыт. Клиенту был отправлен ответ."
);
} }
} catch (e) { } catch (e) {
//Тема и текст уведомления об ошибке //Тема и текст уведомления об ошибке
@ -557,16 +554,12 @@ class InQueue extends EventEmitter {
if (req.headers["content-type"] === "false") req.headers["content-type"] = "application/octet-stream"; if (req.headers["content-type"] === "false") req.headers["content-type"] = "application/octet-stream";
next(); next();
}); });
//Если требуется установить таймаут на обработку сообщений
if (this.inComing.nTimeout !== 0) {
//Конфигурируем сервер - устанавливаем таймаут обработки сообщений //Конфигурируем сервер - устанавливаем таймаут обработки сообщений
this.webApp.use((req, res, next) => { this.webApp.use((req, res, next) => {
//Поднимем флаг истечения таймаута обработки
req.bIsTimedOut = false;
//Если требуется установить таймаут на обработку сообщений
if (this.inComing.nTimeout !== 0)
//Устанавливаем таймаут на ответ от сервера //Устанавливаем таймаут на ответ от сервера
res.setTimeout(this.inComing.nTimeout, () => { res.setTimeout(this.inComing.nTimeout, () => {
//Поднимем флаг исчетечение таймаута обработки
req.bIsTimedOut = true;
//Формируем ошибку //Формируем ошибку
let err = new Error("Истекло время ожидания формирования ответа для завершения текущего запроса."); let err = new Error("Истекло время ожидания формирования ответа для завершения текущего запроса.");
err.status = 504; err.status = 504;
@ -575,17 +568,20 @@ class InQueue extends EventEmitter {
}); });
next(); next();
}); });
}
//Конфигурируем сервер - обработка тела сообщения //Конфигурируем сервер - обработка тела сообщения
this.webApp.use(bodyParser.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" })); this.webApp.use(express.json());
this.webApp.use(express.urlencoded({ extended: true }));
this.webApp.use(express.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" }));
//Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений //Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений
_.forEach( this.services
_.filter(this.services, srv => { .filter(srv => {
return ( return (
srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE && srv.nSrvType === objServiceSchema.NSRV_TYPE_RECIVE &&
[objServiceSchema.SPROTOCOL_HTTP, objServiceSchema.SPROTOCOL_HTTPS].includes(getURLProtocol(srv.sSrvRoot)) [objServiceSchema.SPROTOCOL_HTTP, objServiceSchema.SPROTOCOL_HTTPS].includes(getURLProtocol(srv.sSrvRoot))
); );
}), })
srvs => { .forEach(srvs => {
//Для любых запросов к корневому адресу сервиса - ответ о том, что это за сервис, и что он работает //Для любых запросов к корневому адресу сервиса - ответ о том, что это за сервис, и что он работает
this.webApp.all(srvs.sSrvRoot, (req, res) => { this.webApp.all(srvs.sSrvRoot, (req, res) => {
res.status(200).send( res.status(200).send(
@ -593,19 +589,18 @@ class InQueue extends EventEmitter {
); );
}); });
//Для всех статических функций сервиса... //Для всех статических функций сервиса...
_.forEach( srvs.functions
_.filter(srvs.functions, fn => fn.sFnURL.startsWith("@")), .filter(fn => fn.sFnURL.startsWith("@"))
fn => { .forEach(fn => {
this.webApp.use( this.webApp.use(
buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL.substr(1) }), buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL.substr(1) }),
express.static(`${this.inComing.sStaticDir}/${fn.sFnURL.substr(1)}`) express.static(`${this.inComing.sStaticDir}/${fn.sFnURL.substr(1)}`)
); );
} });
);
//Для всех функций сервиса (кроме статических)... //Для всех функций сервиса (кроме статических)...
_.forEach( srvs.functions
_.filter(srvs.functions, fn => !fn.sFnURL.startsWith("@")), .filter(fn => !fn.sFnURL.startsWith("@"))
fn => { .forEach(fn => {
//...собственный обработчик, в зависимости от указанного способа передачи параметров //...собственный обработчик, в зависимости от указанного способа передачи параметров
this.webApp[fn.sFnPrmsType.toLowerCase()](buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), async (req, res) => { this.webApp[fn.sFnPrmsType.toLowerCase()](buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), async (req, res) => {
try { try {
@ -629,16 +624,13 @@ class InQueue extends EventEmitter {
//Протоколируем в журнал работы сервера //Протоколируем в журнал работы сервера
await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)), { await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)), {
nServiceId: srvs.nId, nServiceId: srvs.nId,
nServiceFnId: fn.nId, nServiceFnId: fn.nId
nQueueId: req.nQId || null
}); });
//Отправим ошибку клиенту //Отправим ошибку клиенту
res.status(err.status || 500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message))); res.status(err.status || 500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)));
}); });
} });
); });
}
);
//Инициализируем настройки подключения //Инициализируем настройки подключения
let connectionSettings = null; let connectionSettings = null;
//Считываем прием сообщений по Kafka //Считываем прием сообщений по Kafka
@ -713,7 +705,7 @@ class InQueue extends EventEmitter {
} }
} }
//Запросы на адреса, не входящие в состав объявленных сервисов - 404 NOT FOUND //Запросы на адреса, не входящие в состав объявленных сервисов - 404 NOT FOUND
this.webApp.use("*", (req, res) => { this.webApp.use((req, res) => {
res.status(404).send( res.status(404).send(
`<html><body><center><br><h1>Сервер приложений ПП Парус 8<br>(${this.common.sVersion} релиз ${this.common.sRelease})</h1><h3>Запрошенный адрес не найден</h3></center></body></html>` `<html><body><center><br><h1>Сервер приложений ПП Парус 8<br>(${this.common.sVersion} релиз ${this.common.sRelease})</h1><h3>Запрошенный адрес не найден</h3></center></body></html>`
); );

View File

@ -9,19 +9,82 @@
const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции const { makeErrorText, getKafkaBroker, getKafkaAuth } = require("./utils"); //Вспомогательные функции
const { Kafka, Partitioners, logLevel } = require("kafkajs"); //Работа с Kafka const { Kafka, Partitioners, logLevel } = require("kafkajs"); //Работа с Kafka
const { SERR_KAFKA_GROUP_UNAVAILABLE, SERR_KAFKA, SWARN_KAFKA, SINFO_KAFKA } = require("./constants"); //Глобальные константы
const { ServerError } = require("./server_errors"); //Типовая ошибка
//----------
// Константы
//----------
//Общие константы работы Kafka
const NCHECK_GROUP_RETRIES = 1; //Количество попыток подключения для проверки доступности группы
//--------------------------
// Вспомогательные функции
//--------------------------
//Проверка доступности группы
const checkGroupAvailable = async (clientProps, groupId) => {
//Иницализируем подключение к Kafka
let client = new Kafka({
...clientProps,
retry: { retries: NCHECK_GROUP_RETRIES }
});
//Инициализируем доступ к командам
const admin = client.admin();
//Подключаемся
await admin.connect();
//Считываем информацию о группе
const groupInfo = await admin.describeGroups([groupId]);
//Отключаемся
await admin.disconnect();
//Если в данной группе есть участники
if (groupInfo?.groups[0]?.members && groupInfo.groups[0].members.length !== 0) {
//Сообщаем о невозможности запустить сервис
throw new ServerError(SERR_KAFKA_GROUP_UNAVAILABLE, `${SERR_KAFKA}: Группа получателя "${groupId}" активна.`);
}
};
//Логгер для вывода внутренних сообщений Kafka в общий поток
const KafkaLogger = selfLogger => {
return level => {
return async ({ log }) => {
//Считываем текст сообщения и доп. информацию
const { message, ...logFullInfo } = log;
//Убираем лишнюю информацию из доп. информации
const { stack, timestamp, logger, ...logInfo } = logFullInfo;
//Исходим от уровня ошибки
switch (level) {
//Ошибка
case logLevel.ERROR:
await selfLogger.error(`${SERR_KAFKA}: ${message} ${JSON.stringify(logInfo)}`);
break;
//Предупреждение
case logLevel.WARN:
await selfLogger.warn(`${SWARN_KAFKA}: ${message} ${JSON.stringify(logInfo)}`);
break;
//Информация
case logLevel.INFO:
await selfLogger.info(`${SINFO_KAFKA}: ${message} ${JSON.stringify(logInfo)}`);
break;
}
};
};
};
//------------ //------------
// Тело модуля // Тело модуля
//------------ //------------
//Отправка сообщения Kafka //Отправка сообщения Kafka
const publishKafka = async ({ settings, url, auth, topic, message }) => { const publishKafka = async ({ settings, url, auth, topic, message, logger }) => {
//Иницализируем подключение к Kafka //Иницализируем подключение к Kafka
let kafka = new Kafka({ let kafka = new Kafka({
clientId: settings.sClientIdSender, clientId: settings.sClientIdSender,
brokers: [url], brokers: [url],
connectionTimeout: settings.nConnectionTimeout, connectionTimeout: settings.nConnectionTimeout,
logLevel: logLevel.NOTHING, logLevel: logLevel[settings.sLogLevel] || logLevel.NOTHING,
logCreator: KafkaLogger(logger),
...auth ...auth
}); });
//Инициализируем продюсера //Инициализируем продюсера
@ -43,13 +106,20 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
let bLogLostConnection = true; let bLogLostConnection = true;
//Получаем брокера по URL сервиса //Получаем брокера по URL сервиса
let sBroker = getKafkaBroker(service.sSrvRoot); let sBroker = getKafkaBroker(service.sSrvRoot);
//Иницализируем подключение к Kafka //Формируем свойства подключения к Kafka
let client = new Kafka({ let clientProps = {
clientId: settings.sClientIdRecipient, clientId: settings.sClientIdRecipient,
brokers: [sBroker], brokers: [sBroker],
connectionTimeout: settings.nConnectionTimeout, connectionTimeout: settings.nConnectionTimeout,
...getKafkaAuth(service.sSrvUser, service.sSrvPass, settings), ...getKafkaAuth(service.sSrvUser, service.sSrvPass, settings),
logLevel: logLevel.NOTHING, logLevel: logLevel[settings.sLogLevel] || logLevel.NOTHING,
logCreator: KafkaLogger(logger)
};
//Проверка доступности группы
await checkGroupAvailable(clientProps, settings.sGroupId);
//Иницализируем подключение к Kafka
let client = new Kafka({
...clientProps,
retry: { retry: {
retries: 0, retries: 0,
maxRetryTime: settings.nMaxRetryTime, maxRetryTime: settings.nMaxRetryTime,
@ -59,7 +129,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
//Если требуется вывести ошибку //Если требуется вывести ошибку
if (bLogLostConnection) { if (bLogLostConnection) {
//Выводим ошибку //Выводим ошибку
logger.error(`Соединение с Kafka потеряно (${sBroker}): ${makeErrorText(error)}`); logger.error(`${SERR_KAFKA}: Соединение потеряно (${sBroker}): ${makeErrorText(error)}`);
//Сбрасываем признак необходимости вывода ошибки //Сбрасываем признак необходимости вывода ошибки
bLogLostConnection = false; bLogLostConnection = false;
} }
@ -90,7 +160,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
}) })
}); });
} catch (e) { } catch (e) {
await logger.error(`Ошибка обработки входящего сообщения Kafka: ${makeErrorText(e)}`); await logger.error(`${SERR_KAFKA}: Ошибка обработки входящего сообщения: ${makeErrorText(e)}`);
} }
} }
}); });
@ -99,7 +169,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
//Если сообщение о потере соединения уже выводилось //Если сообщение о потере соединения уже выводилось
if (!bLogLostConnection) { if (!bLogLostConnection) {
//Сообщим о восстановлении соединения //Сообщим о восстановлении соединения
logger.info(`Соединение с Kafka восстановлено (${sBroker})`); logger.info(`${SINFO_KAFKA}: Соединение восстановлено (${sBroker})`);
//Устанавливаем признак сообщения о потере соединения //Устанавливаем признак сообщения о потере соединения
bLogLostConnection = true; bLogLostConnection = true;
} }
@ -107,7 +177,13 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
//Возвращаем соединение //Возвращаем соединение
return consumer; return consumer;
} catch (e) { } catch (e) {
await logger.error(`Ошибка запуска обработчика очереди входящих сообщений Kafka: ${makeErrorText(e)}`); //Если это фатальная ошибка - выдаем её
if (e.sCode === SERR_KAFKA_GROUP_UNAVAILABLE) {
throw new ServerError(e.sCode, e.sMessage);
} else {
//Если ошибка не фатальная - выводим информацию
await logger.error(`${SERR_KAFKA}: Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`);
}
} }
}; };

View File

@ -7,14 +7,9 @@
// Подключение библиотек // Подключение библиотек
//---------------------- //----------------------
const _ = require("lodash"); //Работа с массивами и объектами const { validateObject, getNowString, deepClone } = require("./utils"); //Вспомогательные функции
const { validateObject, getNowString } = require("./utils"); //Вспомогательные функции
const db = require("./db_connector"); //Модуль взаимодействия с БД const db = require("./db_connector"); //Модуль взаимодействия с БД
const { const { SCONSOLE_LOG_COLOR_PATTERN_ERR, SCONSOLE_LOG_COLOR_PATTERN_WRN, SCONSOLE_LOG_COLOR_PATTERN_INF } = require("./constants"); //Общие константы
SCONSOLE_LOG_COLOR_PATTERN_ERR,
SCONSOLE_LOG_COLOR_PATTERN_WRN,
SCONSOLE_LOG_COLOR_PATTERN_INF
} = require("./constants"); //Общие константы
const { NLOG_STATE_INF, NLOG_STATE_WRN, NLOG_STATE_ERR } = require("../models/obj_log"); //Схемы валидации записи журнала работы сервиса обмена const { NLOG_STATE_INF, NLOG_STATE_WRN, NLOG_STATE_ERR } = require("../models/obj_log"); //Схемы валидации записи журнала работы сервиса обмена
const prmsLoggerSchema = require("../models/prms_logger"); //Схемы валидации параметров функций модуля const prmsLoggerSchema = require("../models/prms_logger"); //Схемы валидации параметров функций модуля
@ -97,8 +92,7 @@ class Logger {
//Протоколирование ошибки //Протоколирование ошибки
async error(sMsg, prms) { async error(sMsg, prms) {
//Подготовим параметры для протоколирования //Подготовим параметры для протоколирования
let logData = {}; let logData = prms ? deepClone(prms) : {};
if (prms) logData = _.cloneDeep(prms);
//Выставим сообщение и тип записи журнала //Выставим сообщение и тип записи журнала
logData.nLogState = NLOG_STATE_ERR; logData.nLogState = NLOG_STATE_ERR;
logData.sMsg = sMsg; logData.sMsg = sMsg;
@ -108,8 +102,7 @@ class Logger {
//Протоколирование предупреждения //Протоколирование предупреждения
async warn(sMsg, prms) { async warn(sMsg, prms) {
//Подготовим параметры для протоколирования //Подготовим параметры для протоколирования
let logData = {}; let logData = prms ? deepClone(prms) : {};
if (prms) logData = _.cloneDeep(prms);
//Выставим сообщение и тип записи журнала //Выставим сообщение и тип записи журнала
logData.nLogState = NLOG_STATE_WRN; logData.nLogState = NLOG_STATE_WRN;
logData.sMsg = sMsg; logData.sMsg = sMsg;
@ -119,8 +112,7 @@ class Logger {
//Протоколирование информации //Протоколирование информации
async info(sMsg, prms) { async info(sMsg, prms) {
//Подготовим параметры для протоколирования //Подготовим параметры для протоколирования
let logData = {}; let logData = prms ? deepClone(prms) : {};
if (prms) logData = _.cloneDeep(prms);
//Выставим сообщение и тип записи журнала //Выставим сообщение и тип записи журнала
logData.nLogState = NLOG_STATE_INF; logData.nLogState = NLOG_STATE_INF;
logData.sMsg = sMsg; logData.sMsg = sMsg;

View File

@ -9,13 +9,22 @@
const { makeErrorText } = require("./utils"); //Вспомогательные функции const { makeErrorText } = require("./utils"); //Вспомогательные функции
const mqtt = require("mqtt"); //Работа с MQTT const mqtt = require("mqtt"); //Работа с MQTT
const { SERR_MQTT, SINFO_MQTT } = require("./constants"); //Глобальные константы
//----------
// Константы
//----------
//Общие константы работы MQTT
const SLOG_ERROR = "ERROR"; //Уровень протоколирования - ошибки
const SLOG_INFO = "INFO"; //Уровень протоколирования - информация
//------------ //------------
// Тело модуля // Тело модуля
//------------ //------------
//Отправка MQTT сообщения //Отправка MQTT сообщения
const publishMQTT = async ({ settings, url, auth, topic, message }) => { const publishMQTT = async ({ settings, url, auth, topic, message, logger }) => {
//Инициализируем подключение //Инициализируем подключение
const client = await mqtt.connectAsync(url, { const client = await mqtt.connectAsync(url, {
clientId: settings.sClientIdSender, clientId: settings.sClientIdSender,
@ -25,6 +34,14 @@ const publishMQTT = async ({ settings, url, auth, topic, message }) => {
password: auth.pass, password: auth.pass,
reconnectPeriod: settings.nReconnectPeriod reconnectPeriod: settings.nReconnectPeriod
}); });
//Прослушиваем ошибки
client.on("error", e => {
//Если требуется выдавать ошибку
if ([SLOG_ERROR, SLOG_INFO].includes(settings.sLogLevel)) {
//Выводим ошибку
logger.error(`${SERR_MQTT}: ${makeErrorText(e)}`);
}
});
//Отправляем сообщение //Отправляем сообщение
await client.publishAsync(topic, message); await client.publishAsync(topic, message);
//Закрываем подключение //Закрываем подключение
@ -64,18 +81,32 @@ const subscribeMQTT = async ({ settings, service, processMessage, logger }) => {
}); });
//Прослушиваем отключение от сервера //Прослушиваем отключение от сервера
client.on("offline", () => { client.on("offline", () => {
//Если требуется выдавать ошибку
if (settings.sLogLevel === SLOG_INFO) {
//Выводим ошибку //Выводим ошибку
logger.error(`Соединение с MQTT потеряно (${sBroker})`); logger.error(`${SERR_MQTT}: Соединение потеряно (${sBroker})`);
}
}); });
//Прослушиваем восстановление соединения //Прослушиваем восстановление соединения
client.on("connect", () => { client.on("connect", () => {
//Если требуется выдавать предупреждение
if (settings.sLogLevel === SLOG_INFO) {
//Сообщим о восстановлении соединения //Сообщим о восстановлении соединения
logger.info(`Соединение с MQTT восстановлено (${sBroker})`); logger.info(`${SINFO_MQTT}: Соединение восстановлено (${sBroker})`);
}
});
//Прослушиваем ошибки
client.on("error", e => {
//Если требуется выдавать ошибку
if ([SLOG_ERROR, SLOG_INFO].includes(settings.sLogLevel)) {
//Выводим ошибку
logger.error(`${SERR_MQTT}: ${makeErrorText(e)}`);
}
}); });
//Возвращаем подключение //Возвращаем подключение
return client; return client;
} catch (e) { } catch (e) {
logger.error(`Ошибка запуска обработчика очереди исходящих сообщений MQTT: ${makeErrorText(e)}`); logger.error(`${SERR_MQTT}: Ошибка запуска обработчика очереди входящих сообщений: ${makeErrorText(e)}`);
} }
}; };

View File

@ -7,11 +7,10 @@
// Подключение внешних библиотек // Подключение внешних библиотек
//------------------------------ //------------------------------
const _ = require("lodash"); //Работа с массивами и коллекциями
const EventEmitter = require("events"); //Обработчик пользовательских событий const EventEmitter = require("events"); //Обработчик пользовательских событий
const { ServerError } = require("./server_errors"); //Типовая ошибка const { ServerError } = require("./server_errors"); //Типовая ошибка
const { SERR_OBJECT_BAD_INTERFACE, SERR_MAIL_FAILED } = require("./constants"); //Общесистемные константы const { SERR_OBJECT_BAD_INTERFACE, SERR_MAIL_FAILED } = require("./constants"); //Общесистемные константы
const { makeErrorText, validateObject, sendMail } = require("./utils"); //Вспомогательные функции const { makeErrorText, validateObject, sendMail, deepClone } = require("./utils"); //Вспомогательные функции
const prmsNotifierSchema = require("../models/prms_notifier"); //Схемы валидации параметров функций класса const prmsNotifierSchema = require("../models/prms_notifier"); //Схемы валидации параметров функций класса
//-------------------------- //--------------------------
@ -65,21 +64,15 @@ class Notifier extends EventEmitter {
//Добавление уведомления в очередь отправки //Добавление уведомления в очередь отправки
async addMessage(prms) { async addMessage(prms) {
//Проверяем структуру переданного объекта для старта //Проверяем структуру переданного объекта для старта
let sCheckResult = validateObject( let sCheckResult = validateObject(prms, prmsNotifierSchema.addMessage, "Параметры функции добавления уведомления в очередь отправки");
prms,
prmsNotifierSchema.addMessage,
"Параметры функции добавления уведомления в очередь отправки"
);
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
let tmp = _.cloneDeep(prms); let tmp = deepClone(prms);
tmp.bSent = false; tmp.bSent = false;
this.messages.push(tmp); this.messages.push(tmp);
} else { } else {
await this.logger.error( await this.logger.error(
`Ошибка добавления уведомления в очередь: ${makeErrorText( `Ошибка добавления уведомления в очередь: ${makeErrorText(new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult))}`
new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult)
)}`
); );
} }
} }
@ -113,7 +106,13 @@ class Notifier extends EventEmitter {
if (!message.bSent) { if (!message.bSent) {
try { try {
//Если всё в порядке с настройками //Если всё в порядке с настройками
if (this.mail.sHost && this.mail.nPort && this.mail.sFrom && this.mail.hasOwnProperty('bSecure') && this.mail.hasOwnProperty('bRejectUnauthorized')) { if (
this.mail.sHost &&
this.mail.nPort &&
this.mail.sFrom &&
this.mail.hasOwnProperty("bSecure") &&
this.mail.hasOwnProperty("bRejectUnauthorized")
) {
//Отправляем //Отправляем
await sendMail({ await sendMail({
mail: this.mail, mail: this.mail,
@ -135,16 +134,12 @@ class Notifier extends EventEmitter {
); );
} }
} catch (e) { } catch (e) {
await this.logger.error( await this.logger.error(`Ошибка отправки сообщения с темой "${message.sSubject}" для ${message.sTo}: ${makeErrorText(e)}`);
`Ошибка отправки сообщения с темой "${message.sSubject}" для ${message.sTo}: ${makeErrorText(
e
)}`
);
} }
} }
} }
//Подчищаем очередь - удалим уже отправленные //Подчищаем очередь - удалим уже отправленные
_.remove(this.messages, { bSent: true }); this.messages = this.messages.filter(msg => !msg.bSent);
//Выставим флаг - цикл опроса неактивен //Выставим флаг - цикл опроса неактивен
this.bInSendLoop = false; this.bInSendLoop = false;
//Перезапускаем опрос //Перезапускаем опрос

View File

@ -7,12 +7,11 @@
// Подключение внешних библиотек // Подключение внешних библиотек
//------------------------------ //------------------------------
const _ = require("lodash"); //Работа с массивами и коллекциями
const EventEmitter = require("events"); //Обработчик пользовательских событий const EventEmitter = require("events"); //Обработчик пользовательских событий
const ChildProcess = require("child_process"); //Работа с дочерними процессами const ChildProcess = require("child_process"); //Работа с дочерними процессами
const { ServerError } = require("./server_errors"); //Типовая ошибка const { ServerError } = require("./server_errors"); //Типовая ошибка
const { SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы const { SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы
const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции const { makeErrorText, validateObject, deepClone } = require("./utils"); //Вспомогательные функции
const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди
const { NFORCE_YES } = require("../models/common"); //Общие константы и схемы валидации const { NFORCE_YES } = require("../models/common"); //Общие константы и схемы валидации
@ -53,7 +52,7 @@ class OutQueue extends EventEmitter {
//Признак функционирования обработчика //Признак функционирования обработчика
this.bWorking = false; this.bWorking = false;
//Параметры очереди //Параметры очереди
this.outGoing = _.cloneDeep(prms.outGoing); this.outGoing = deepClone(prms.outGoing);
//Количество доступных обработчиков //Количество доступных обработчиков
this.nWorkersLeft = this.outGoing.nMaxWorkers; this.nWorkersLeft = this.outGoing.nMaxWorkers;
//Идентификатор таймера проверки очереди //Идентификатор таймера проверки очереди
@ -150,6 +149,8 @@ class OutQueue extends EventEmitter {
if (!sCheckResult) { if (!sCheckResult) {
//Добавляем идентификатор позиции очереди в список обрабатываемых //Добавляем идентификатор позиции очереди в список обрабатываемых
this.addInProgress({ nQueueId: prms.queue.nId }); this.addInProgress({ nQueueId: prms.queue.nId });
//Найдем сервис
const service = this.services.find(s => s.nId === prms.queue.nServiceId);
//Отдаём команду дочернему процессу обработчика на старт исполнения //Отдаём команду дочернему процессу обработчика на старт исполнения
prms.proc.send({ prms.proc.send({
nQueueId: prms.queue.nId, nQueueId: prms.queue.nId,
@ -159,10 +160,8 @@ class OutQueue extends EventEmitter {
nPoolMax: this.outGoing.nPoolMax, nPoolMax: this.outGoing.nPoolMax,
nPoolIncrement: this.outGoing.nPoolIncrement nPoolIncrement: this.outGoing.nPoolIncrement
}, },
service: _.find(this.services, { nId: prms.queue.nServiceId }), service: service,
function: _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, { function: service?.functions.find(f => f.nId === prms.queue.nServiceFnId),
nId: prms.queue.nServiceFnId
}),
sProxy: this.sProxy, sProxy: this.sProxy,
kafka: this.kafka, kafka: this.kafka,
mqtt: this.mqtt mqtt: this.mqtt
@ -201,10 +200,8 @@ class OutQueue extends EventEmitter {
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
//Найдем сервис и функцию, исполнявшие данное сообщение //Найдем сервис и функцию, исполнявшие данное сообщение
let service = _.find(this.services, { nId: prms.queue.nServiceId }); let service = this.services.find(s => s.nId === prms.queue.nServiceId);
let func = _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, { let func = service?.functions.find(f => f.nId === prms.queue.nServiceFnId);
nId: prms.queue.nServiceFnId
});
//Если нашли и для функции-обработчика указан признак необходимости оповещения об ошибках //Если нашли и для функции-обработчика указан признак необходимости оповещения об ошибках
if (service && func && func.nErrNtfSign == objServiceFnSchema.NERR_NTF_SIGN_YES) if (service && func && func.nErrNtfSign == objServiceFnSchema.NERR_NTF_SIGN_YES)
//Отправим уведомление об ошибке отработки в почту //Отправим уведомление об ошибке отработки в почту

View File

@ -7,9 +7,7 @@
// Подключение библиотек // Подключение библиотек
//---------------------- //----------------------
require("module-alias/register"); //Поддержка псевонимов при подключении модулей const { httpRequest } = require("./http_client"); //Работа с HTTP/HTTPS запросами
const _ = require("lodash"); //Работа с массивами и объектами
const rqp = require("request-promise"); //Работа с HTTP/HTTPS запросами
const lg = require("./logger"); //Протоколирование работы const lg = require("./logger"); //Протоколирование работы
const db = require("./db_connector"); //Взаимодействие с БД const db = require("./db_connector"); //Взаимодействие с БД
const { const {
@ -20,6 +18,7 @@ const {
parseOptionsXML, parseOptionsXML,
buildOptionsXML, buildOptionsXML,
deepMerge, deepMerge,
deepClone,
getKafkaConnectionSettings, getKafkaConnectionSettings,
getMQTTConnectionSettings, getMQTTConnectionSettings,
getKafkaBroker, getKafkaBroker,
@ -137,9 +136,9 @@ const appProcess = async prms => {
//Флаг установленности контекста для функции начала сеанса //Флаг установленности контекста для функции начала сеанса
let bCtxIsSet = false; let bCtxIsSet = false;
//Кладём данные тела в объект сообщения и инициализируем поле для ответа //Кладём данные тела в объект сообщения и инициализируем поле для ответа
_.extend(prms.queue, { blMsg: qData.blMsg, blResp: null }); Object.assign(prms.queue, { blMsg: qData.blMsg, blResp: null });
//Кладём данные контекста в сервис //Кладём данные контекста в сервис
_.extend(prms.service, serviceCtx); Object.assign(prms.service, serviceCtx);
//Собираем параметры для передачи серверу //Собираем параметры для передачи серверу
let options = { method: prms.function.sFnPrmsType, encoding: null }; let options = { method: prms.function.sFnPrmsType, encoding: null };
//Инициализируем параметры ответа сервера //Инициализируем параметры ответа сервера
@ -238,8 +237,8 @@ const appProcess = async prms => {
const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore); const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore);
let resBefore = null; let resBefore = null;
try { try {
let resBeforePrms = _.cloneDeep(prms); let resBeforePrms = deepClone(prms);
resBeforePrms.options = _.cloneDeep(options); resBeforePrms.options = deepClone(options);
resBeforePrms.dbConn = dbConn; resBeforePrms.dbConn = dbConn;
resBefore = await fnBefore(resBeforePrms); resBefore = await fnBefore(resBeforePrms);
} catch (e) { } catch (e) {
@ -255,7 +254,7 @@ const appProcess = async prms => {
//Если структура ответа в норме //Если структура ответа в норме
if (!sCheckResult) { if (!sCheckResult) {
//Применим ответ "До" - обработанное сообщение очереди //Применим ответ "До" - обработанное сообщение очереди
if (!_.isUndefined(resBefore.blMsg)) { if (resBefore.blMsg !== undefined) {
prms.queue.blMsg = resBefore.blMsg; prms.queue.blMsg = resBefore.blMsg;
await dbConn.setQueueMsg({ await dbConn.setQueueMsg({
nQueueId: prms.queue.nId, nQueueId: prms.queue.nId,
@ -281,14 +280,14 @@ const appProcess = async prms => {
} }
} }
//Применим ответ "До" - параметры отправки сообщения удаленному серверу //Применим ответ "До" - параметры отправки сообщения удаленному серверу
if (!_.isUndefined(resBefore.options)) options = deepMerge(options, resBefore.options); if (resBefore.options !== undefined) options = deepMerge(options, resBefore.options);
//Применим ответ "До" - флаг отсуствия аутентификации //Применим ответ "До" - флаг отсуствия аутентификации
if (!_.isUndefined(resBefore.bUnAuth)) if (resBefore.bUnAuth !== undefined)
if (resBefore.bUnAuth === true) { if (resBefore.bUnAuth === true) {
throw new ServerError(SERR_UNAUTH, "Нет аутентификации"); throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
} }
//Применим ответ "До" - контекст работы сервиса //Применим ответ "До" - контекст работы сервиса
if (!_.isUndefined(resBefore.sCtx)) if (resBefore.sCtx !== undefined)
if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGIN) { if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGIN) {
prms.service.sCtx = resBefore.sCtx; prms.service.sCtx = resBefore.sCtx;
prms.service.dCtxExp = resBefore.dCtxExp; prms.service.dCtxExp = resBefore.dCtxExp;
@ -300,7 +299,7 @@ const appProcess = async prms => {
bCtxIsSet = true; bCtxIsSet = true;
} }
//Применим ответ "До" - флаг прекращения дальнейшей обработки сообщения - если он поднят, то дальше обработку сообщения прекращаем //Применим ответ "До" - флаг прекращения дальнейшей обработки сообщения - если он поднят, то дальше обработку сообщения прекращаем
if (!_.isUndefined(resBefore.bStopPropagation) && resBefore.bStopPropagation === true) bStopPropagation = true; if (resBefore.bStopPropagation !== undefined && resBefore.bStopPropagation === true) bStopPropagation = true;
} else { } else {
//Или расскажем об ошибке в структуре ответа //Или расскажем об ошибке в структуре ответа
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
@ -317,7 +316,7 @@ const appProcess = async prms => {
try { try {
//Сохраняем параметры с которыми уходило сообщение //Сохраняем параметры с которыми уходило сообщение
try { try {
let tmpOptions = _.cloneDeep(options); let tmpOptions = deepClone(options);
//Исключим из параметров заведомо бинарные поля (их сохранение не предусмотрено) //Исключим из параметров заведомо бинарные поля (их сохранение не предусмотрено)
delete tmpOptions.body; delete tmpOptions.body;
delete tmpOptions.cert; delete tmpOptions.cert;
@ -364,8 +363,13 @@ const appProcess = async prms => {
if (prms.function.nTimeoutConn && !options.timeout) options.timeout = prms.function.nTimeoutConn; if (prms.function.nTimeoutConn && !options.timeout) options.timeout = prms.function.nTimeoutConn;
//Отправляем запрос //Отправляем запрос
serverResp = prms.function.nTimeoutAsynch serverResp = prms.function.nTimeoutAsynch
? await wrapPromiseTimeout(prms.function.nTimeoutAsynch, rqp(options)) ? await wrapPromiseTimeout(prms.function.nTimeoutAsynch, signal =>
: await rqp(options); httpRequest({
...options,
signal
})
)
: await httpRequest(options);
break; break;
} }
//Сохраняем полученный ответ //Сохраняем полученный ответ
@ -376,7 +380,7 @@ const appProcess = async prms => {
nIsOriginal: NIS_ORIGINAL_YES nIsOriginal: NIS_ORIGINAL_YES
}); });
//Сохраняем заголовки ответа и HTTP-статус //Сохраняем заголовки ответа и HTTP-статус
optionsResp.headers = _.cloneDeep(serverResp.headers); optionsResp.headers = deepClone(serverResp.headers);
optionsResp.statusCode = serverResp.statusCode; optionsResp.statusCode = serverResp.statusCode;
try { try {
let sOptionsResp = buildOptionsXML({ options: optionsResp }); let sOptionsResp = buildOptionsXML({ options: optionsResp });
@ -401,9 +405,9 @@ const appProcess = async prms => {
const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter); const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter);
let resAfter = null; let resAfter = null;
try { try {
let resAfterPrms = _.cloneDeep(prms); let resAfterPrms = deepClone(prms);
resAfterPrms.options = _.cloneDeep(options); resAfterPrms.options = deepClone(options);
resAfterPrms.optionsResp = _.cloneDeep(optionsResp); resAfterPrms.optionsResp = deepClone(optionsResp);
resAfter = await fnAfter(resAfterPrms); resAfter = await fnAfter(resAfterPrms);
} catch (e) { } catch (e) {
throw new ServerError(SERR_APP_SERVER_AFTER, e.message); throw new ServerError(SERR_APP_SERVER_AFTER, e.message);
@ -418,7 +422,7 @@ const appProcess = async prms => {
//Если структура ответа в норме //Если структура ответа в норме
if (!sCheckResult) { if (!sCheckResult) {
//Применим ответ "После" - обработанный ответ удаленного сервиса //Применим ответ "После" - обработанный ответ удаленного сервиса
if (!_.isUndefined(resAfter.blResp)) { if (resAfter.blResp !== undefined) {
prms.queue.blResp = resAfter.blResp; prms.queue.blResp = resAfter.blResp;
await dbConn.setQueueResp({ await dbConn.setQueueResp({
nQueueId: prms.queue.nId, nQueueId: prms.queue.nId,
@ -427,10 +431,10 @@ const appProcess = async prms => {
}); });
} }
//Применим ответ "После" - флаг утентификации сервиса //Применим ответ "После" - флаг утентификации сервиса
if (!_.isUndefined(resAfter.bUnAuth)) if (resAfter.bUnAuth !== undefined)
if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации"); if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
//Применим ответ "После" - контекст работы сервиса //Применим ответ "После" - контекст работы сервиса
if (!_.isUndefined(resAfter.sCtx)) if (resAfter.sCtx !== undefined)
if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGIN) { if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGIN) {
prms.service.sCtx = resAfter.sCtx; prms.service.sCtx = resAfter.sCtx;
prms.service.dCtxExp = resAfter.dCtxExp; prms.service.dCtxExp = resAfter.dCtxExp;

View File

@ -7,12 +7,11 @@
// Подключение внешних библиотек // Подключение внешних библиотек
//------------------------------ //------------------------------
const _ = require("lodash"); //Работа с массивами и коллекциями const httpRequest = require("./http_client"); //Работа с HTTP/HTTPS запросами
const rqp = require("request-promise"); //Работа с HTTP/HTTPS запросами
const EventEmitter = require("events"); //Обработчик пользовательских событий const EventEmitter = require("events"); //Обработчик пользовательских событий
const { ServerError } = require("./server_errors"); //Типовая ошибка const { ServerError } = require("./server_errors"); //Типовая ошибка
const { SERR_SERVICE_UNAVAILABLE, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы const { SERR_SERVICE_UNAVAILABLE, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы
const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции const { makeErrorText, validateObject, deepClone } = require("./utils"); //Вспомогательные функции
const prmsServiceAvailableControllerSchema = require("../models/prms_service_available_controller"); //Схемы валидации параметров функций класса const prmsServiceAvailableControllerSchema = require("../models/prms_service_available_controller"); //Схемы валидации параметров функций класса
const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервисов const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервисов
@ -107,10 +106,7 @@ class ServiceAvailableController extends EventEmitter {
//Обходим список сервисов для проверки //Обходим список сервисов для проверки
for (let service of this.services) { for (let service of this.services) {
//Если сервис надо проверять на доступность и это сервис для отправки исходящих сообщений //Если сервис надо проверять на доступность и это сервис для отправки исходящих сообщений
if ( if (service.nUnavlblNtfSign == objServiceSchema.NUNAVLBL_NTF_SIGN_YES && service.nSrvType == objServiceSchema.NSRV_TYPE_SEND) {
service.nUnavlblNtfSign == objServiceSchema.NUNAVLBL_NTF_SIGN_YES &&
service.nSrvType == objServiceSchema.NSRV_TYPE_SEND
) {
try { try {
// Инициализируем параметры запроса // Инициализируем параметры запроса
let options = {}; let options = {};
@ -123,7 +119,7 @@ class ServiceAvailableController extends EventEmitter {
options.proxy = service.sProxyURL ?? this.sProxy; options.proxy = service.sProxyURL ?? this.sProxy;
} }
//Отправляем проверочный запрос //Отправляем проверочный запрос
await rqp(options); await httpRequest(options);
//Запрос прошел - фиксируем дату доступности и сбрасываем дату недоступности //Запрос прошел - фиксируем дату доступности и сбрасываем дату недоступности
service.dAvailable = new Date(); service.dAvailable = new Date();
service.dUnAvailable = null; service.dUnAvailable = null;
@ -193,10 +189,9 @@ class ServiceAvailableController extends EventEmitter {
}); });
} }
} catch (e) { } catch (e) {
await this.logger.error( await this.logger.error(`При проверке просроченных сообщений сервиса ${service.sCode}: ${makeErrorText(e)}`, {
`При проверке просроченных сообщений сервиса ${service.sCode}: ${makeErrorText(e)}`, nServiceId: service.nId
{ nServiceId: service.nId } });
);
} }
} }
} }
@ -232,7 +227,7 @@ class ServiceAvailableController extends EventEmitter {
//Выставляем флаг неактивности (пока) цикла опроса //Выставляем флаг неактивности (пока) цикла опроса
this.bInDetectingLoop = false; this.bInDetectingLoop = false;
//запоминаем список обслуживаемых сервисов и инициализируем даты доступности и недоступности //запоминаем список обслуживаемых сервисов и инициализируем даты доступности и недоступности
this.services = _.cloneDeep(prms.services); this.services = deepClone(prms.services);
this.services.forEach(s => { this.services.forEach(s => {
s.dUnAvailable = null; s.dUnAvailable = null;
s.dAvailable = new Date(); s.dAvailable = new Date();

View File

@ -8,7 +8,6 @@
//---------------------- //----------------------
const fs = require("fs"); //Работа с файлами const fs = require("fs"); //Работа с файлами
const _ = require("lodash"); //Работа с массивами и объектами
const os = require("os"); //Средства операционной системы const os = require("os"); //Средства операционной системы
const xml2js = require("xml2js"); //Конвертация XML в JSON const xml2js = require("xml2js"); //Конвертация XML в JSON
const Schema = require("validate"); //Схемы валидации const Schema = require("validate"); //Схемы валидации
@ -29,6 +28,92 @@ const { SPROTOCOL_HTTP, SPROTOCOL_KAFKA } = require("../models/obj_service"); //
// Тело модуля // Тело модуля
//------------ //------------
//Глубокое клонирование
const deepClone = value => {
//Клонируем объект в зависимости от его типа, сохраняя прототипы
const seen = new WeakMap();
const clone = val => {
//Примитивы и функции возвращаем как есть
if (val === null || typeof val !== "object") return val;
if (typeof val === "function") return val;
//Защита от циклических ссылок
if (seen.has(val)) return seen.get(val);
//Специализированные типы
if (val instanceof Date) return new Date(val);
if (Buffer.isBuffer(val)) return Buffer.from(val);
if (ArrayBuffer.isView(val)) return new val.constructor(val);
if (val instanceof RegExp) return new RegExp(val);
//Ошибки: сохраняем тип, сообщение, стек и пользовательские поля
if (val instanceof Error) {
const clonedError = new val.constructor(val.message);
seen.set(val, clonedError);
clonedError.name = val.name;
clonedError.stack = val.stack;
Reflect.ownKeys(val).forEach(key => {
if (key === "name" || key === "message" || key === "stack") return;
const desc = Object.getOwnPropertyDescriptor(val, key);
if (!desc) return;
if ("value" in desc) desc.value = clone(desc.value);
try {
Object.defineProperty(clonedError, key, desc);
} catch (e) {}
});
return clonedError;
}
//Коллекции
if (val instanceof Map) {
const m = new Map();
seen.set(val, m);
val.forEach((v, k) => m.set(clone(k), clone(v)));
return m;
}
if (val instanceof Set) {
const s = new Set();
seen.set(val, s);
val.forEach(v => s.add(clone(v)));
return s;
}
//Коллекции, которые невозможно корректно клонировать - возвращаем по ссылке
if (val instanceof WeakMap || val instanceof WeakSet || val instanceof Promise) {
return val;
}
//Массивы
if (Array.isArray(val)) {
const arr = new Array(val.length);
seen.set(val, arr);
for (let i = 0; i < val.length; i++) {
if (Object.prototype.hasOwnProperty.call(val, i)) {
arr[i] = clone(val[i]);
}
}
return arr;
}
//Общие объекты и пользовательские классы: сохраняем прототип и дескрипторы свойств
const proto = Object.getPrototypeOf(val);
const obj = Object.create(proto);
seen.set(val, obj);
Reflect.ownKeys(val).forEach(key => {
const desc = Object.getOwnPropertyDescriptor(val, key);
if (!desc) return;
if ("value" in desc) {
desc.value = clone(desc.value);
}
try {
Object.defineProperty(obj, key, desc);
} catch (e) {}
});
return obj;
};
try {
return clone(value);
} catch (e) {
//В случае непредвиденной ошибки формируем информативное исключение
const err = new Error("Ошибка глубокого копирования объекта");
err.originalError = e;
throw err;
}
};
//Валидация объекта //Валидация объекта
const validateObject = (obj, schema, sObjName) => { const validateObject = (obj, schema, sObjName) => {
//Объявим результат //Объявим результат
@ -38,7 +123,7 @@ const validateObject = (obj, schema, sObjName) => {
//И есть что проверять //И есть что проверять
if (obj) { if (obj) {
//Сделаем это //Сделаем это
const objTmp = _.cloneDeep(obj); const objTmp = deepClone(obj);
const errors = schema.validate(objTmp, { strip: false }); const errors = schema.validate(objTmp, { strip: false });
//Если есть ошибки //Если есть ошибки
if (errors && Array.isArray(errors)) { if (errors && Array.isArray(errors)) {
@ -47,7 +132,7 @@ const validateObject = (obj, schema, sObjName) => {
let a = errors.map(e => { let a = errors.map(e => {
return e.message; return e.message;
}); });
sRes = `Объект${sObjName ? ` "${sObjName}" ` : " "}имеет некорректный формат: ${_.uniq(a).join("; ")}`; sRes = `Объект${sObjName ? ` "${sObjName}" ` : " "}имеет некорректный формат: ${Array.from(new Set(a)).join("; ")}`;
} }
} else { } else {
//Валидатор вернул не то, что мы ожидали //Валидатор вернул не то, что мы ожидали
@ -322,11 +407,30 @@ const getNowString = () => {
}; };
//Глубокое слияние объектов //Глубокое слияние объектов
const deepMerge = (...args) => { function deepMerge(...sources) {
let res = {}; const isPlainObject = value => Object.prototype.toString.call(value) === "[object Object]";
for (let i = 0; i < args.length; i++) _.merge(res, args[i]);
return res; const cloneValue = value => {
}; return deepClone(value);
};
const target = {};
for (const source of sources) {
if (!isPlainObject(source)) continue;
for (const [key, value] of Object.entries(source)) {
if (isPlainObject(value)) {
if (!isPlainObject(target[key])) target[key] = {};
target[key] = deepMerge(target[key], value);
} else {
target[key] = cloneValue(value);
}
}
}
return target;
}
//Глубокое копирование объекта //Глубокое копирование объекта
const deepCopyObject = obj => JSON.parse(JSON.stringify(obj)); const deepCopyObject = obj => JSON.parse(JSON.stringify(obj));
@ -403,18 +507,29 @@ const getURLProtocol = sURL => {
return sURL.substring(0, 1) === "/" ? SPROTOCOL_HTTP : new URL(sURL).protocol.slice(0, -1); return sURL.substring(0, 1) === "/" ? SPROTOCOL_HTTP : new URL(sURL).protocol.slice(0, -1);
}; };
//Обёртывание промиса в таймаут исполнения //Wraps async task with timeout and abort support
const wrapPromiseTimeout = (timeout, promise, promiseCancellable = true) => { const wrapPromiseTimeout = (timeout, executor) => {
if (!timeout) return promise; if (!timeout || typeof executor !== "function") {
let timeoutPid; return executor ? executor() : Promise.resolve();
const timeoutPromise = new Promise((resolve, reject) => { }
const controller = new AbortController();
const sMessage = `Истёк интервал ожидания (${timeout} мс) завершения асинхронного процесса.`; const sMessage = `Истёк интервал ожидания (${timeout} мс) завершения асинхронного процесса.`;
let e = new Error(sMessage); const timeoutError = new Error(sMessage);
e.error = sMessage; timeoutError.error = sMessage;
timeoutPid = setTimeout(() => reject(e), timeout);
let timeoutPid;
const timeoutPromise = new Promise((_, reject) => {
timeoutPid = setTimeout(() => {
controller.abort(timeoutError);
reject(timeoutError);
}, timeout);
}); });
return Promise.race([promise, timeoutPromise]).finally(() => {
if (promiseCancellable && promise.promise().isPending()) promise.cancel(); const taskPromise = Promise.resolve(executor(controller.signal));
return Promise.race([taskPromise, timeoutPromise]).finally(() => {
if (timeoutPid) clearTimeout(timeoutPid); if (timeoutPid) clearTimeout(timeoutPid);
}); });
}; };
@ -437,6 +552,7 @@ exports.parseOptionsXML = parseOptionsXML;
exports.buildOptionsXML = buildOptionsXML; exports.buildOptionsXML = buildOptionsXML;
exports.getNowString = getNowString; exports.getNowString = getNowString;
exports.deepMerge = deepMerge; exports.deepMerge = deepMerge;
exports.deepClone = deepClone;
exports.deepCopyObject = deepCopyObject; exports.deepCopyObject = deepCopyObject;
exports.isUndefined = isUndefined; exports.isUndefined = isUndefined;
exports.getKafkaConnectionSettings = getKafkaConnectionSettings; exports.getKafkaConnectionSettings = getKafkaConnectionSettings;

View File

@ -7,7 +7,6 @@
// Подключение библиотек // Подключение библиотек
//---------------------- //----------------------
require("module-alias/register"); //Поддержка псевонимов при подключении модулей
const cfg = require("./config"); //Настройки сервера приложений const cfg = require("./config"); //Настройки сервера приложений
const app = require("./core/app"); //Сервер приложений const app = require("./core/app"); //Сервер приложений
const { makeErrorText, getNowString } = require("./core/utils"); //Вспомогательные функции const { makeErrorText, getNowString } = require("./core/utils"); //Вспомогательные функции
@ -29,11 +28,7 @@ process.env.NODE_TLS_REJECT_UNAUTHORIZED = cfg.outGoing.bValidateSSL === false ?
//Обработка события "выход" жизненного цикла процесса //Обработка события "выход" жизненного цикла процесса
process.on("exit", code => { process.on("exit", code => {
//Сообщим о завершении процесса //Сообщим о завершении процесса
console.log( console.log(SCONSOLE_LOG_COLOR_PATTERN_WRN, `${getNowString()} ПРЕДУПРЕЖДЕНИЕ: `, `Сервер приложений остановлен (код: ${code})`);
SCONSOLE_LOG_COLOR_PATTERN_WRN,
`${getNowString()} ПРЕДУПРЕЖДЕНИЕ: `,
`Сервер приложений остановлен (код: ${code})`
);
}); });
//Обработка событий мягкого останова процесса //Обработка событий мягкого останова процесса

View File

@ -9,6 +9,21 @@
const Schema = require("validate"); //Схемы валидации const Schema = require("validate"); //Схемы валидации
//----------
// Константы
//----------
//Уровни протоколирования подключения Kafka
const SKAFKA_LOG_LEVEL_NOTHING = "NOTHING"; //Протоколирование отключено
const SKAFKA_LOG_LEVEL_ERROR = "ERROR"; //Протоколирование ошибок
const SKAFKA_LOG_LEVEL_WARN = "WARN"; //Протоколирование предупреждений
const SKAFKA_LOG_LEVEL_INFO = "INFO"; //Протоколирование общей информации
//Уровни протоколирования подключения MQTT
const SMQTT_LOG_LEVEL_NOTHING = "NOTHING"; //Протоколирование отключено
const SMQTT_LOG_LEVEL_ERROR = "ERROR"; //Протоколирование ошибок
const SMQTT_LOG_LEVEL_INFO = "INFO"; //Протоколирование информации
//------------- //-------------
// Тело модуля // Тело модуля
//------------- //-------------
@ -52,6 +67,21 @@ const validatePoolIncrementInComing = val => val >= 0 && val <= 1000;
//Функция проверки значения времени ожидания отработки входящего сообщения для обработчика входящих сообщений //Функция проверки значения времени ожидания отработки входящего сообщения для обработчика входящих сообщений
const validateTimeoutInComing = val => val >= 0; const validateTimeoutInComing = val => val >= 0;
//Функция проверки значения времени ожидания успешного подключения Kafka
const validateTimeoutKafka = val => val >= 0;
//Функция проверки значения максимального ожидания между попытками переподключения Kafka
const validateMaxRetryTimeKafka = val => val >= 0;
//Функция проверки значения максимального ожидания между попытками переподключения Kafka
const validateInitialRetryTimeKafka = val => val >= 0;
//Функция проверки значения времени ожидания успешного подключения к MQTT
const validateConnectTimeoutMQTT = val => val >= 0;
//Функция проверки значения времени ожидания между попытками переподключения к MQTT
const validateReconnectPeriodMQTT = val => val >= 0;
//Схема валидации общих параметров сервера приложений //Схема валидации общих параметров сервера приложений
const common = new Schema({ const common = new Schema({
//Наименование сервера приложений //Наименование сервера приложений
@ -169,7 +199,8 @@ const outGoing = new Schema({
type: Boolean, type: Boolean,
required: true, required: true,
message: { message: {
type: path => `Признак проверки SSL-сертификатов адресов отправки сообщений (${path}) имеет некорректный тип данных (ожидалось - Number)`, type: path =>
`Признак проверки SSL-сертификатов адресов отправки сообщений (${path}) имеет некорректный тип данных (ожидалось - Boolean)`,
required: path => `Не указан признак проверки SSL-сертификатов адресов отправки сообщений (${path})` required: path => `Не указан признак проверки SSL-сертификатов адресов отправки сообщений (${path})`
} }
}, },
@ -340,6 +371,246 @@ const inComing = new Schema({
} }
}); });
//Схема валидации параметров SSL подключения к Kafka
const kafkaSSL = new Schema({
//Запрещать использование самоподписанных сертификатов (true - запретить, false - разрешить)
bRejectUnauthorized: {
type: Boolean,
required: true,
message: {
type: path =>
`Признак запрета использования самоподписанных сертификатов SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Boolean)`,
required: path => `Не указан признак запрета использования самоподписанных сертификатов SSL подключения к Kafka (${path})`
}
},
//Путь к корневому сертификату с информацией об удостоверяющем центре
sPathCa: {
type: String,
required: false,
message: {
type: path => `Путь к корневому сертификату SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`
}
},
//Путь к закрытому ключу
sPathKey: {
type: String,
required: false,
message: {
type: path => `Путь к закрытому ключу SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`
}
},
//Путь к сертификату
sPathCert: {
type: String,
required: false,
message: {
type: path => `Путь к сертификату SSL подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`
}
}
});
//Схема валидации параметров подключения Kafka
const kafka = new Schema({
//Мнемокод сервиса обмена
sService: {
type: String,
required: false,
message: {
type: path => `Мнемокод сервиса обмена подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`
}
},
//ID клиента-отправителя
sClientIdSender: {
type: String,
required: true,
message: {
type: path => `ID клиента-отправителя подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан ID клиента-отправителя подключения Kafka (${path})`
}
},
//ID клиента-получателя
sClientIdRecipient: {
type: String,
required: true,
message: {
type: path => `ID клиента-получателя подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан ID клиента-получателя подключения Kafka (${path})`
}
},
//Группа получателя
sGroupId: {
type: String,
required: true,
message: {
type: path => `Группа получателя подключения Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указана группа получателя подключения Kafka (${path})`
}
},
//Время ожидания успешного подключения (мс)
nConnectionTimeout: {
type: Number,
required: true,
use: { validateTimeoutKafka },
message: {
type: path => `Время ожидания успешного подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указано время ожидания успешного подключения к Kafka (${path})`,
validateTimeoutKafka: path => `Время ожидания успешного подключения Kafka (${path}) должно быть неотрицательным целым числом`
}
},
//Необходимость попытки переподключения при потере соединения
bRestartOnFailure: {
type: Boolean,
required: true,
message: {
type: path => `Признак необходимости попытки переподключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Boolean)`,
required: path => `Не указан признак необходимости попытки переподключения к Kafka (${path})`
}
},
//Время максимального ожидания между попытками переподключения (мс)
nMaxRetryTime: {
type: Number,
required: true,
use: { validateMaxRetryTimeKafka },
message: {
type: path =>
`Время максимального ожидания между попытками переподключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указано время максимального ожидания между попытками переподключения к Kafka (${path})`,
validateMaxRetryTimeKafka: path =>
`Время максимального ожидания между попытками переподключения к Kafka (${path}) должно быть неотрицательным целым числом`
}
},
//Время ожидания между попытками переподключения (мс)
nInitialRetryTime: {
type: Number,
required: true,
use: { validateInitialRetryTimeKafka },
message: {
type: path => `Время ожидания между попытками переподключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указано время ожидания между попытками переподключения к Kafka (${path})`,
validateInitialRetryTimeKafka: path =>
`Время ожидания между попытками переподключения к Kafka (${path}) должно быть неотрицательным целым числом`
}
},
//Уровень протоколирования подключения
sLogLevel: {
type: String,
enum: [SKAFKA_LOG_LEVEL_NOTHING, SKAFKA_LOG_LEVEL_ERROR, SKAFKA_LOG_LEVEL_WARN, SKAFKA_LOG_LEVEL_INFO],
required: true,
message: {
type: path => `Уровень протоколирования подключения к Kafka (${path}) имеет некорректный тип данных (ожидалось - String)`,
enum: path => `Значение уровня протоколирования подключения к Kafka (${path}) не поддерживается`,
required: path => `Не указан уровень протоколирования подключения к Kafka (${path})`
}
},
//Использовать аутентификацию по SSL-сертификату
bAuthSSL: {
type: Boolean,
required: true,
message: {
type: path => `Признак использования аутентификации по SSL к Kafka (${path}) имеет некорректный тип данных (ожидалось - Boolean)`,
required: path => `Не указан признак использования аутентификации по SSL к Kafka (${path})`
}
},
//Параметры аутентификации по SSL-сертификату
ssl: {
schema: kafkaSSL,
required: true,
message: {
required: path => `Не указаны параметры аутентификации по SSL к Kafka (${path})`
}
}
});
//Описатель схемы валидации подключения к Kafka
const defKafka = (bRequired, sName) => {
return {
type: Array,
required: bRequired,
each: kafka,
message: {
type: `Список подключений Kafka (${sName}) имеет некорректный тип данных (ожидалось - Array)`,
required: `Не указан список подключений Kafka (${sName})`
}
};
};
//Схема валидации параметров подключения MQTT
const mqtt = new Schema({
//Мнемокод сервиса обмена
sService: {
type: String,
required: false,
message: {
type: path => `Мнемокод сервиса обмена подключения MQTT (${path}) имеет некорректный тип данных (ожидалось - String)`
}
},
//ID клиента-отправителя
sClientIdSender: {
type: String,
required: true,
message: {
type: path => `ID клиента-отправителя подключения MQTT (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан ID клиента-отправителя подключения MQTT (${path})`
}
},
//ID клиента-получателя
sClientIdRecipient: {
type: String,
required: true,
message: {
type: path => `ID клиента-получателя подключения MQTT (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан ID клиента-получателя подключения MQTT (${path})`
}
},
//Время ожидания успешного подключения (мс)
nConnectTimeout: {
type: Number,
required: true,
use: { validateConnectTimeoutMQTT },
message: {
type: path => `Время ожидания успешного подключения к MQTT (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указано время ожидания успешного подключения к MQTT (${path})`,
validateConnectTimeoutMQTT: path => `Время ожидания успешного подключения к MQTT (${path}) должно быть неотрицательным целым числом`
}
},
//Время ожидания между попытками переподключения (мс)
nReconnectPeriod: {
type: Number,
required: true,
use: { validateReconnectPeriodMQTT },
message: {
type: path => `Время ожидания между попытками переподключения к MQTT (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указано время ожидания между попытками переподключения к MQTT (${path})`,
validateReconnectPeriodMQTT: path =>
`Время ожидания между попытками переподключения к MQTT (${path}) должно быть неотрицательным целым числом`
}
},
//Уровень протоколирования подключения
sLogLevel: {
type: String,
enum: [SMQTT_LOG_LEVEL_NOTHING, SMQTT_LOG_LEVEL_ERROR, SMQTT_LOG_LEVEL_INFO],
required: true,
message: {
type: path => `Уровень протоколирования подключения к MQTT (${path}) имеет некорректный тип данных (ожидалось - String)`,
enum: path => `Значение уровня протоколирования подключения к MQTT (${path}) не поддерживается`,
required: path => `Не указан уровень протоколирования подключения к MQTT (${path})`
}
}
});
//Описатель схемы валидации подключения к MQTT
const defMQTT = (bRequired, sName) => {
return {
type: Array,
required: bRequired,
each: mqtt,
message: {
type: `Список подключений MQTT (${sName}) имеет некорректный тип данных (ожидалось - Array)`,
required: `Не указан список подключений MQTT (${sName})`
}
};
};
//Схема валидации параметров отправки E-Mail уведомлений //Схема валидации параметров отправки E-Mail уведомлений
const mail = new Schema({ const mail = new Schema({
//Адреc сервера SMTP //Адреc сервера SMTP
@ -441,6 +712,22 @@ const config = new Schema({
required: path => `Не указаны параметры обработки очереди входящих сообщений (${path})` required: path => `Не указаны параметры обработки очереди входящих сообщений (${path})`
} }
}, },
//Параметры подключения к Kafka
kafka: {
schema: defKafka(true, "kafka"),
required: true,
message: {
required: path => `Не указаны параметры подключения Kafka (${path})`
}
},
//Параметры подключения к MQTT
mqtt: {
schema: defMQTT(true, "mqtt"),
required: true,
message: {
required: path => `Не указаны параметры подключения MQTT (${path})`
}
},
//Параметры отправки E-Mail уведомлений //Параметры отправки E-Mail уведомлений
mail: { mail: {
schema: mail, schema: mail,

View File

@ -8,8 +8,7 @@
//------------------------------ //------------------------------
const xml2js = require("xml2js"); //Конвертация XML в JSON и JSON в XML const xml2js = require("xml2js"); //Конвертация XML в JSON и JSON в XML
const _ = require("lodash"); //Работа с коллекциями и объектами const httpRequest = require("../core/http_client"); //Работа с HTTP/HTTPS запросами
const rqp = require("request-promise"); //Работа с HTTP/HTTPS запросами
const config = require("../config"); //Параметры сервера const config = require("../config"); //Параметры сервера
const { SDDAUTH_API_CLIENT_ID, SDEPARTMENT_NAME, SDEPARTMENT_ID } = require("./diadoc_config"); //Ключ разработчика const { SDDAUTH_API_CLIENT_ID, SDEPARTMENT_NAME, SDEPARTMENT_ID } = require("./diadoc_config"); //Ключ разработчика
@ -62,7 +61,7 @@ const toArray = (obj, tags) => {
if (typeof value === "object") { if (typeof value === "object") {
obj[prop] = toArray(value, tag); obj[prop] = toArray(value, tag);
} }
if (tags.indexOf(prop) != -1 && !_.isArray(obj[prop])) { if (tags.indexOf(prop) != -1 && !Array.isArray(obj[prop])) {
obj[prop] = JSON.parse("[" + JSON.stringify(value) + "]"); obj[prop] = JSON.parse("[" + JSON.stringify(value) + "]");
} }
} }
@ -143,12 +142,12 @@ const getOrganizations = organizations => {
//Итоговая выборка //Итоговая выборка
let organization = { Organizations: [] }; let organization = { Organizations: [] };
//Найдем активную организацию не в роуминге //Найдем активную организацию не в роуминге
organization.Organizations[0] = organizations.Organizations.find(org => (org.IsRoaming === isRoaming) && (org.IsActive === isActive)); organization.Organizations[0] = organizations.Organizations.find(org => org.IsRoaming === isRoaming && org.IsActive === isActive);
//Если не удалось получить организацию не в роуминге //Если не удалось получить организацию не в роуминге
if (!organization.Organizations[0]) { if (!organization.Organizations[0]) {
//Найдем активную организацию //Найдем активную организацию
organization.Organizations[0] = organizations.Organizations.find(org => (org.IsActive === isActive)); organization.Organizations[0] = organizations.Organizations.find(org => org.IsActive === isActive);
}; }
//Если не удалось получить активную организацию //Если не удалось получить активную организацию
if (!organization.Organizations[0]) { if (!organization.Organizations[0]) {
//Если нет организации не в роуминге и найдено более одной организации //Если нет организации не в роуминге и найдено более одной организации
@ -167,10 +166,10 @@ const getOrganizations = organizations => {
//Получение организации по ИНН/КПП контрагента //Получение организации по ИНН/КПП контрагента
const getOrganization = async (sSrvRoot, headers, nInn, nKpp) => { const getOrganization = async (sSrvRoot, headers, nInn, nKpp) => {
//Параметры запроса //Параметры запроса
let rqpOptions; let httpRequestOptions;
let serverResp; let serverResp;
//Формируем запрос для получения BoxId //Формируем запрос для получения BoxId
rqpOptions = { httpRequestOptions = {
uri: buildOrganizationURL(sSrvRoot), uri: buildOrganizationURL(sSrvRoot),
qs: { qs: {
inn: nInn, inn: nInn,
@ -180,18 +179,18 @@ const getOrganization = async (sSrvRoot, headers, nInn, nKpp) => {
json: true json: true
}; };
//Выполним запрос //Выполним запрос
serverResp = { Organizations: [await rqp(rqpOptions)] }; serverResp = { Organizations: [await httpRequest(httpRequestOptions)] };
return serverResp; return serverResp;
}; };
//Получение ящика организации по ИНН/КПП контрагента //Получение ящика организации по ИНН/КПП контрагента
const getOrganizationBoxId = async (sSrvRoot, headers, nInn, nKpp) => { const getOrganizationBoxId = async (sSrvRoot, headers, nInn, nKpp) => {
//Параметры запроса //Параметры запроса
let rqpOptions; let httpRequestOptions;
let serverResp; let serverResp;
let organization = {}; let organization = {};
//Формируем запрос для получения BoxId по ИНН/КПП //Формируем запрос для получения BoxId по ИНН/КПП
rqpOptions = { httpRequestOptions = {
uri: buildOrganizationsByInnKppURL(sSrvRoot), uri: buildOrganizationsByInnKppURL(sSrvRoot),
qs: { qs: {
inn: nInn, inn: nInn,
@ -202,7 +201,7 @@ const getOrganizationBoxId = async (sSrvRoot, headers, nInn, nKpp) => {
}; };
try { try {
//Выполним запрос //Выполним запрос
serverResp = await rqp(rqpOptions); serverResp = await httpRequest(httpRequestOptions);
try { try {
//Получим организацию не в роуминге (или единственную организацию в роуминге) //Получим организацию не в роуминге (или единственную организацию в роуминге)
serverResp = getOrganizations(serverResp); serverResp = getOrganizations(serverResp);
@ -212,9 +211,9 @@ const getOrganizationBoxId = async (sSrvRoot, headers, nInn, nKpp) => {
} catch (e) { } catch (e) {
//Получим головную организацию по ИНН/КПП //Получим головную организацию по ИНН/КПП
serverResp = await getOrganization(sSrvRoot, headers, nInn, nKpp); serverResp = await getOrganization(sSrvRoot, headers, nInn, nKpp);
}; }
//Проверим соответствие КПП организации //Проверим соответствие КПП организации
if ((serverResp?.Organizations[0]?.Kpp != nKpp) && (serverResp?.Organizations[0])) { if (serverResp?.Organizations[0]?.Kpp != nKpp && serverResp?.Organizations[0]) {
//Если КПП не соответстует заданному - проверим, что в полученной организации есть департамент с заданным КПП //Если КПП не соответстует заданному - проверим, что в полученной организации есть департамент с заданным КПП
for (let i in serverResp.Organizations[0].Departments) { for (let i in serverResp.Organizations[0].Departments) {
//Если найден подходящий департамент - запомним идентификатор и выходим из цикла //Если найден подходящий департамент - запомним идентификатор и выходим из цикла
@ -222,23 +221,25 @@ const getOrganizationBoxId = async (sSrvRoot, headers, nInn, nKpp) => {
//Сохраняем полученный ответ //Сохраняем полученный ответ
organization.DepartmentId = serverResp.Organizations[0].Departments[i].DepartmentId; organization.DepartmentId = serverResp.Organizations[0].Departments[i].DepartmentId;
break; break;
}; }
}; }
}; }
//Не удалось получить ящик получателя или полученая организация не соответствует заданному ИНН //Не удалось получить ящик получателя или полученая организация не соответствует заданному ИНН
if ((!serverResp?.Organizations[0]?.Boxes[0]?.BoxId) || (serverResp?.Organizations[0]?.Inn != nInn)) { if (!serverResp?.Organizations[0]?.Boxes[0]?.BoxId || serverResp?.Organizations[0]?.Inn != nInn) {
throw new Error(`Не удалось получить ящик получателя для контрагента с ИНН: ${nInn} и КПП: ${nKpp}`); throw new Error(`Не удалось получить ящик получателя для контрагента с ИНН: ${nInn} и КПП: ${nKpp}`);
} }
//Не удалось получить департаментом с соответствующим КПП //Не удалось получить департаментом с соответствующим КПП
if ((serverResp?.Organizations[0]?.Kpp != nKpp) && (!organization?.DepartmentId)) { if (serverResp?.Organizations[0]?.Kpp != nKpp && !organization?.DepartmentId) {
throw new Error(`Не удалось получить ящик получателя для контрагента с ИНН: ${nInn} и КПП: ${nKpp}, у головной организации отсутствует подразделение с КПП: ${nKpp}`); throw new Error(
`Не удалось получить ящик получателя для контрагента с ИНН: ${nInn} и КПП: ${nKpp}, у головной организации отсутствует подразделение с КПП: ${nKpp}`
);
} }
//Сохраняем полученный ответ //Сохраняем полученный ответ
organization.BoxId = serverResp.Organizations[0].Boxes[0].BoxId; organization.BoxId = serverResp.Organizations[0].Boxes[0].BoxId;
return organization; return organization;
} catch (e) { } catch (e) {
throw Error(`Ошибка при получении ящика получателя: ${e.message}`); throw Error(`Ошибка при получении ящика получателя: ${e.message}`);
}; }
}; };
//Обработчик "До" подключения к сервису //Обработчик "До" подключения к сервису
@ -296,12 +297,10 @@ const beforeMessagePost = async prms => {
} }
//Если не достали из контекста токен доступа - значит нет аутентификации на сервере //Если не достали из контекста токен доступа - значит нет аутентификации на сервере
if (!sToken) return { bUnAuth: true }; if (!sToken) return { bUnAuth: true };
//Получим параметры запроса
const optionsData = await toJSON(prms.queue.sOptions);
//Конвертируем XML из "Парус 8" в JSON //Конвертируем XML из "Парус 8" в JSON
let obj = await toJSON(prms.queue.blMsg.toString()); let obj = await toJSON(prms.queue.blMsg.toString());
//Формируем запрос для получения FromBoxId //Формируем запрос для получения FromBoxId
let rqpOptions = { let httpRequestOptions = {
uri: buildMyOrganizationURL(prms.service.sSrvRoot), uri: buildMyOrganizationURL(prms.service.sSrvRoot),
headers: buildHeaders(sAPIClientId, sToken), headers: buildHeaders(sAPIClientId, sToken),
json: true json: true
@ -311,11 +310,11 @@ const beforeMessagePost = async prms => {
let organization; let organization;
try { try {
//Выполним запрос //Выполним запрос
serverResp = await rqp(rqpOptions); serverResp = await httpRequest(httpRequestOptions);
//Получим идентификатор организации по ИНН/КПП поставщика документа //Получим идентификатор организации по ИНН/КПП поставщика документа
for (let i in serverResp.Organizations) { for (let i in serverResp.Organizations) {
//Если найдена подходящая организация - запомним идентификатор и выходим из цикла //Если найдена подходящая организация - запомним идентификатор и выходим из цикла
if (serverResp.Organizations[i].Inn == optionsData.inn_pr && serverResp.Organizations[i].Kpp == optionsData.kpp_pr) { if (serverResp.Organizations[i].Inn == prms.options.inn_pr && serverResp.Organizations[i].Kpp == prms.options.kpp_pr) {
//Сохраняем полученный ответ //Сохраняем полученный ответ
obj.FromBoxId = serverResp.Organizations[i].Boxes[0].BoxId; obj.FromBoxId = serverResp.Organizations[i].Boxes[0].BoxId;
break; break;
@ -323,18 +322,23 @@ const beforeMessagePost = async prms => {
} }
//Не удалось получить ящик отправителя //Не удалось получить ящик отправителя
if (!obj.FromBoxId) { if (!obj.FromBoxId) {
throw new Error(`Не удалось получить ящик текущей организации с ИНН: ${optionsData.inn_pr} и КПП: ${optionsData.kpp_pr}`); throw new Error(`Не удалось получить ящик текущей организации с ИНН: ${prms.options.inn_pr} и КПП: ${prms.options.kpp_pr}`);
} }
} catch (e) { } catch (e) {
throw Error(`Ошибка при получении ящика текущей организации: ${e.message}`); throw Error(`Ошибка при получении ящика текущей организации: ${e.message}`);
} }
//Получим ящик получателя //Получим ящик получателя
organization = await getOrganizationBoxId(prms.service.sSrvRoot, buildHeaders(sAPIClientId, sToken), optionsData.inn_cs, optionsData.kpp_cs); organization = await getOrganizationBoxId(
prms.service.sSrvRoot,
buildHeaders(sAPIClientId, sToken),
prms.options.inn_cs,
prms.options.kpp_cs
);
obj.ToBoxId = organization.BoxId; obj.ToBoxId = organization.BoxId;
//Если не заполнен идентификатор подразделения и при получении ящика удалось его подобрать //Если не заполнен идентификатор подразделения и при получении ящика удалось его подобрать
if ((!obj.ToDepartmentId) && (organization.DepartmentId)) { if (!obj.ToDepartmentId && organization.DepartmentId) {
obj.ToDepartmentId = organization.DepartmentId; obj.ToDepartmentId = organization.DepartmentId;
}; }
//Если пришел ответ //Если пришел ответ
if (prms.queue.blResp && serverResp.statusCode == 200) { if (prms.queue.blResp && serverResp.statusCode == 200) {
//Вернем загруженный документ //Вернем загруженный документ
@ -456,21 +460,19 @@ const beforeEvent = async prms => {
} }
//Если не достали из контекста токен доступа - значит нет аутентификации на сервере //Если не достали из контекста токен доступа - значит нет аутентификации на сервере
if (!sToken) return { bUnAuth: true }; if (!sToken) return { bUnAuth: true };
//Получим параметры запроса
const optionsData = await toJSON(prms.queue.sOptions);
//Формируем запрос для получения BoxId //Формируем запрос для получения BoxId
let rqpOptions = { let httpRequestOptions = {
uri: buildMyOrganizationURL(prms.service.sSrvRoot), uri: buildMyOrganizationURL(prms.service.sSrvRoot),
headers: buildHeaders(sAPIClientId, sToken), headers: buildHeaders(sAPIClientId, sToken),
json: true json: true
}; };
try { try {
//Выполним запрос //Выполним запрос
serverResp = await rqp(rqpOptions); serverResp = await httpRequest(httpRequestOptions);
//Получим идентификатор организации по ИНН/КПП контрагента организации //Получим идентификатор организации по ИНН/КПП контрагента организации
for (let i in serverResp.Organizations) { for (let i in serverResp.Organizations) {
//Если найдена подходящая организация - запомним идентификатор и выходим из цикла //Если найдена подходящая организация - запомним идентификатор и выходим из цикла
if (serverResp.Organizations[i].Inn == optionsData.inn && serverResp.Organizations[i].Kpp == optionsData.kpp) { if (serverResp.Organizations[i].Inn == prms.options.inn && serverResp.Organizations[i].Kpp == prms.options.kpp) {
//Сохраняем полученный ответ //Сохраняем полученный ответ
sBoxId = serverResp.Organizations[i].Boxes[0].BoxId; sBoxId = serverResp.Organizations[i].Boxes[0].BoxId;
//Если задано подразделение //Если задано подразделение
@ -497,7 +499,7 @@ const beforeEvent = async prms => {
} }
//Не удалось получить ящик текущей организации //Не удалось получить ящик текущей организации
if (!sBoxId) { if (!sBoxId) {
throw new Error(`Не удалось получить ящик текущей организации с ИНН: ${optionsData.inn} и КПП: ${optionsData.kpp}`); throw new Error(`Не удалось получить ящик текущей организации с ИНН: ${prms.options.inn} и КПП: ${prms.options.kpp}`);
} }
} catch (e) { } catch (e) {
throw Error(`Ошибка при получении ящика текущей организации: ${e.message}`); throw Error(`Ошибка при получении ящика текущей организации: ${e.message}`);
@ -543,7 +545,7 @@ const afterEvent = async prms => {
let sAPIClientId = null; //Ключ разработчика let sAPIClientId = null; //Ключ разработчика
let sToken = null; //Токен доступа let sToken = null; //Токен доступа
let resu = null; //Ответ сервера let resu = null; //Ответ сервера
let rqpOptions = null; //Параметры для запроса информации по ящику let httpRequestOptions = null; //Параметры для запроса информации по ящику
let serverResp; //Результат запроса информации по организации let serverResp; //Результат запроса информации по организации
let resp = null; //Ответ сервера let resp = null; //Ответ сервера
let box = null; //Информация ящика let box = null; //Информация ящика
@ -559,10 +561,10 @@ const afterEvent = async prms => {
//Получим список уникальных ящиков //Получим список уникальных ящиков
for (let i in resp.Events) { for (let i in resp.Events) {
if (resp.Events[i]?.Message) { if (resp.Events[i]?.Message) {
if ((!boxIds.boxIds.find(box => box.boxId === resp.Events[i]?.Message.FromBoxId)) && (resp.Events[i]?.Message.FromBoxId)) { if (!boxIds.boxIds.find(box => box.boxId === resp.Events[i]?.Message.FromBoxId) && resp.Events[i]?.Message.FromBoxId) {
boxIds.boxIds.push({ boxId: resp.Events[i]?.Message.FromBoxId }); boxIds.boxIds.push({ boxId: resp.Events[i]?.Message.FromBoxId });
} }
if ((!boxIds.boxIds.find(box => box.boxId === resp.Events[i]?.Message.ToBoxId)) && (resp.Events[i]?.Message.ToBoxId)) { if (!boxIds.boxIds.find(box => box.boxId === resp.Events[i]?.Message.ToBoxId) && resp.Events[i]?.Message.ToBoxId) {
boxIds.boxIds.push({ boxId: resp.Events[i]?.Message.ToBoxId }); boxIds.boxIds.push({ boxId: resp.Events[i]?.Message.ToBoxId });
} }
} }
@ -576,9 +578,9 @@ const afterEvent = async prms => {
//Если не достали из контекста токен доступа - значит нет аутентификации на сервере //Если не достали из контекста токен доступа - значит нет аутентификации на сервере
if (!sToken) return { bUnAuth: true }; if (!sToken) return { bUnAuth: true };
for (let i in boxIds.boxIds) { for (let i in boxIds.boxIds) {
rqpOptions = null; httpRequestOptions = null;
//Формируем запрос для получения BoxId //Формируем запрос для получения BoxId
rqpOptions = { httpRequestOptions = {
uri: buildOrganizationBoxIdURL(prms.service.sSrvRoot), uri: buildOrganizationBoxIdURL(prms.service.sSrvRoot),
headers: buildHeaders(sAPIClientId, sToken), headers: buildHeaders(sAPIClientId, sToken),
qs: { qs: {
@ -588,7 +590,7 @@ const afterEvent = async prms => {
}; };
try { try {
//Выполним запрос //Выполним запрос
serverResp = await rqp(rqpOptions); serverResp = await httpRequest(httpRequestOptions);
if (serverResp?.Organization) { if (serverResp?.Organization) {
//Запишем полученную информацию о контрагенте //Запишем полученную информацию о контрагенте
boxIds.boxIds[i].Inn = serverResp?.Organization?.Inn; boxIds.boxIds[i].Inn = serverResp?.Organization?.Inn;
@ -702,7 +704,7 @@ const beforeDocLoad = async prms => {
surl = `${surl}?${msgId}${prms.options.smsgid}&${entId}${prms.options.sentid}`; surl = `${surl}?${msgId}${prms.options.smsgid}&${entId}${prms.options.sentid}`;
let obj; let obj;
let rblMsg; let rblMsg;
if (prms.queue.blMsg && (prms.options.type != 5) && (prms.options.type != 6)) { if (prms.queue.blMsg && prms.options.type != 5 && prms.options.type != 6) {
//Конвертируем XML из "Парус 8" в понятный "ДИАДОК" JSON //Конвертируем XML из "Парус 8" в понятный "ДИАДОК" JSON
obj = await toJSON(prms.queue.blMsg.toString()); obj = await toJSON(prms.queue.blMsg.toString());
rblMsg = Buffer.from(JSON.stringify(obj)); rblMsg = Buffer.from(JSON.stringify(obj));
@ -751,7 +753,7 @@ const afterDocLoad = async prms => {
await new Promise(resolve => setTimeout(resolve, 2000)); await new Promise(resolve => setTimeout(resolve, 2000));
} }
//Выполним повторный запрос //Выполним повторный запрос
serverResp = await rqp(prms.options); serverResp = await httpRequest(prms.options);
//Сохраняем полученный ответ //Сохраняем полученный ответ
prms.queue.blResp = Buffer.from(serverResp.body || ""); prms.queue.blResp = Buffer.from(serverResp.body || "");
prms.optionsResp.statusCode = serverResp.statusCode; prms.optionsResp.statusCode = serverResp.statusCode;
@ -837,8 +839,6 @@ const beforeDepartmentIdGet = async prms => {
const afterDepartmentIdGet = async prms => { const afterDepartmentIdGet = async prms => {
let resu = null; let resu = null;
let organization = {}; let organization = {};
//Получим параметры запроса
const optionsData = await toJSON(prms.queue.sOptions);
//Действие выполнено успешно //Действие выполнено успешно
if (prms.optionsResp.statusCode == 200) { if (prms.optionsResp.statusCode == 200) {
try { try {
@ -846,7 +846,7 @@ const afterDepartmentIdGet = async prms => {
//Получим организацию не в роуминге (или единственную организацию в роуминге) //Получим организацию не в роуминге (или единственную организацию в роуминге)
organization = getOrganizations(JSON.parse(prms.queue.blResp.toString())); organization = getOrganizations(JSON.parse(prms.queue.blResp.toString()));
if (!organization) { if (!organization) {
throw Error(`Не удалось получить ящик для контрагента с ИНН: ${optionsData.nINN} и КПП: ${optionsData.nKPP}`); throw Error(`Не удалось получить ящик для контрагента с ИНН: ${prms.options.nINN} и КПП: ${prms.options.nKPP}`);
} }
} catch (e) { } catch (e) {
//Получим ключ разработчика //Получим ключ разработчика
@ -854,8 +854,8 @@ const afterDepartmentIdGet = async prms => {
//Считаем токен доступа из контекста сервиса //Считаем токен доступа из контекста сервиса
let sToken = prms.service.sCtx; let sToken = prms.service.sCtx;
//Получим головную организацию по ИНН/КПП //Получим головную организацию по ИНН/КПП
organization = await getOrganization(prms.service.sSrvRoot, buildHeaders(sAPIClientId, sToken), optionsData.nINN, optionsData.nKPP); organization = await getOrganization(prms.service.sSrvRoot, buildHeaders(sAPIClientId, sToken), prms.options.nINN, prms.options.nKPP);
}; }
//Преобразуем JSON ответ сервиса "ДИАДОК" в XML, понятный "Парус 8" //Преобразуем JSON ответ сервиса "ДИАДОК" в XML, понятный "Парус 8"
resu = toXML({ root: organization }); resu = toXML({ root: organization });
} catch (e) { } catch (e) {

View File

@ -8,6 +8,25 @@
//---------------------- //----------------------
const oracledb = require("oracledb"); //Работа с СУБД Oracle const oracledb = require("oracledb"); //Работа с СУБД Oracle
const { makeErrorText } = require("../core/utils"); //Вспомогательные функции
//---------------------------
// Инициализация Thick-режима
//---------------------------
//Инициализируем Thick-режим до любых подключений к БД
try {
if (typeof oracledb.initOracleClient === "function" && !(process.env.NODE_ORACLE_DB_THIN_MODE === 1)) {
const libDir = process.env.ORACLE_CLIENT_LIB_DIR;
if (libDir) {
oracledb.initOracleClient({ libDir });
} else {
oracledb.initOracleClient();
}
}
} catch (e) {
throw new Error(`Не удалось инициализировать Oracle Client (Thick-режим): ${makeErrorText(e)}`);
}
//-------------------------- //--------------------------
// Глобальные идентификаторы // Глобальные идентификаторы

View File

@ -8,7 +8,7 @@
//------------------------------ //------------------------------
const xml2js = require("xml2js"); //Конвертация XML в JSON и JSON в XML const xml2js = require("xml2js"); //Конвертация XML в JSON и JSON в XML
const _ = require("lodash"); //Работа с коллекциями и объектами const { deepClone } = require("../core/utils"); //Вспомогательные функции
//--------------------- //---------------------
// Глобальные константы // Глобальные константы
@ -48,24 +48,23 @@ const converXMLArraysToJSON = (obj, arrayKey) => {
let tmp = []; let tmp = [];
let itemKey = obj[key][arrayKey]; let itemKey = obj[key][arrayKey];
if (obj[key][itemKey]) { if (obj[key][itemKey]) {
if (_.isArray(obj[key][itemKey])) { if (Array.isArray(obj[key][itemKey])) {
for (let i = 0; i < obj[key][itemKey].length; i++) { for (let i = 0; i < obj[key][itemKey].length; i++) {
let buf = {}; let buf = {};
buf[itemKey] = _.cloneDeep(obj[key][itemKey][i]); buf[itemKey] = deepClone(obj[key][itemKey][i]);
tmp.push(buf); tmp.push(buf);
} }
} else { } else {
let buf = {}; let buf = {};
buf[itemKey] = _.cloneDeep(obj[key][itemKey]); buf[itemKey] = deepClone(obj[key][itemKey]);
tmp.push(buf); tmp.push(buf);
} }
} }
obj[key] = tmp; obj[key] = tmp;
converXMLArraysToJSON(obj[key], arrayKey); converXMLArraysToJSON(obj[key], arrayKey);
} else { } else {
if (_.isObject(obj[key])) converXMLArraysToJSON(obj[key], arrayKey); if (typeof obj[key] === "object" && obj[key] !== null && !Array.isArray(obj[key])) converXMLArraysToJSON(obj[key], arrayKey);
if (_.isArray(obj[key])) if (Array.isArray(obj[key])) for (let i = 0; i < obj[key].length; i++) converXMLArraysToJSON(obj[key][i], arrayKey);
for (let i = 0; i < obj[key].length; i++) converXMLArraysToJSON(obj[key][i], arrayKey);
} }
} }
}; };
@ -73,10 +72,7 @@ const converXMLArraysToJSON = (obj, arrayKey) => {
//Обработчик "До" для полученного сообщения //Обработчик "До" для полученного сообщения
const before = async prms => { const before = async prms => {
//Если пришел запрос в JSON //Если пришел запрос в JSON
if ( if (prms.options.headers["content-type"] && prms.options.headers["content-type"].startsWith(SHEADER_REQ_CONTENT_TYPE_JSON)) {
prms.options.headers["content-type"] &&
prms.options.headers["content-type"].startsWith(SHEADER_REQ_CONTENT_TYPE_JSON)
) {
//Конвертируем полученный в JSON-запрос в XML, понятный серверной части //Конвертируем полученный в JSON-запрос в XML, понятный серверной части
let requestXML = ""; let requestXML = "";
try { try {
@ -102,11 +98,8 @@ const after = async prms => {
prms.options.qs[SQUERY_RESP_CT] && prms.options.qs[SQUERY_RESP_CT] &&
prms.options.qs[SQUERY_RESP_CT].startsWith(SHEADER_REQ_CONTENT_TYPE_JSON)) || prms.options.qs[SQUERY_RESP_CT].startsWith(SHEADER_REQ_CONTENT_TYPE_JSON)) ||
(prms.function.sCode != SFNC_UPLOAD && (prms.function.sCode != SFNC_UPLOAD &&
((prms.options.headers["content-type"] && ((prms.options.headers["content-type"] && prms.options.headers["content-type"].startsWith(SHEADER_REQ_CONTENT_TYPE_JSON)) ||
prms.options.headers["content-type"].startsWith(SHEADER_REQ_CONTENT_TYPE_JSON)) || (prms.options.qs && prms.options.qs[SQUERY_RESP_CT] && prms.options.qs[SQUERY_RESP_CT].startsWith(SHEADER_REQ_CONTENT_TYPE_JSON))))
(prms.options.qs &&
prms.options.qs[SQUERY_RESP_CT] &&
prms.options.qs[SQUERY_RESP_CT].startsWith(SHEADER_REQ_CONTENT_TYPE_JSON))))
) { ) {
//Буфер для конвертации //Буфер для конвертации
let parseRes = ""; let parseRes = "";

View File

@ -8,8 +8,7 @@
//------------------------------ //------------------------------
const xml2js = require("xml2js"); //Конвертация XML в JSON и JSON в XML const xml2js = require("xml2js"); //Конвертация XML в JSON и JSON в XML
const _ = require("lodash"); //Работа с коллекциями и объектами const httpRequest = require("../core/http_client"); //Работа с HTTP/HTTPS запросами
const rqp = require("request-promise"); //Работа с HTTP/HTTPS запросами
const { SMCHD_STORAGE_SYSTEM } = require("./sbis_config"); //Система хранения МЧД const { SMCHD_STORAGE_SYSTEM } = require("./sbis_config"); //Система хранения МЧД
//--------------------- //---------------------
@ -33,7 +32,7 @@ const tag = [
// Список имен тегов для замены ([Старое значение], [Новое значение]) // Список имен тегов для замены ([Старое значение], [Новое значение])
const replaceTags = [ const replaceTags = [
['"Иные получатели":', '"ИныеПолучатели":'], ['"Иные получатели":', '"ИныеПолучатели":'],
['"Создатель документа":', '"СоздательДокумента":'], ['"Создатель документа":', '"СоздательДокумента":']
]; ];
//------------ //------------
@ -41,9 +40,9 @@ const replaceTags = [
//------------ //------------
//Замена наименований тегов (для корректной работы toXML) //Замена наименований тегов (для корректной работы toXML)
const replaceTag = (obj) => { const replaceTag = obj => {
for (let value of replaceTags) { for (let value of replaceTags) {
obj = obj.replace(new RegExp(value[0], 'g'), value[1]); obj = obj.replace(new RegExp(value[0], "g"), value[1]);
} }
return obj; return obj;
}; };
@ -52,7 +51,7 @@ const replaceTag = (obj) => {
const toArray = (obj, tags) => { const toArray = (obj, tags) => {
for (const prop in obj) { for (const prop in obj) {
const value = obj[prop]; const value = obj[prop];
if (tags.indexOf(prop) != -1 && !_.isArray(obj[prop])) { if (tags.indexOf(prop) != -1 && !Array.isArray(obj[prop])) {
obj[prop] = JSON.parse("[" + JSON.stringify(value) + "]"); obj[prop] = JSON.parse("[" + JSON.stringify(value) + "]");
} }
if (typeof value === "object") { if (typeof value === "object") {
@ -255,7 +254,7 @@ const afterAttParse = async prms => {
if (prms.optionsResp.statusCode != 200) { if (prms.optionsResp.statusCode != 200) {
//Выполним повторный запрос //Выполним повторный запрос
await new Promise(resolve => setTimeout(resolve, 2000)); await new Promise(resolve => setTimeout(resolve, 2000));
let serverResp = await rqp(prms.options); let serverResp = await httpRequest(prms.options);
//Сохраняем полученный ответ //Сохраняем полученный ответ
prms.queue.blResp = Buffer.from(serverResp.body || ""); prms.queue.blResp = Buffer.from(serverResp.body || "");
prms.optionsResp.statusCode = serverResp.statusCode; prms.optionsResp.statusCode = serverResp.statusCode;

3034
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -18,24 +18,18 @@
}, },
"homepage": "https://git.citpb.ru/CITKParus/P8-ExchangeService/", "homepage": "https://git.citpb.ru/CITKParus/P8-ExchangeService/",
"dependencies": { "dependencies": {
"body-parser": "^1.19.0",
"cors": "^2.8.5", "cors": "^2.8.5",
"express": "^4.17.1", "express": "^5.2.1",
"kafkajs": "^2.2.4", "kafkajs": "^2.2.4",
"lodash": "^4.17.19",
"module-alias": "^2.2.2",
"mqtt": "^5.10.1", "mqtt": "^5.10.1",
"nodemailer": "^6.4.11", "nodemailer": "^6.9.16",
"oracledb": "^4.2.0", "oracledb": "^6.6.0",
"pg": "^8.13.1", "pg": "^8.13.1",
"request": "^2.88.2", "undici": "^6.0.0",
"request-promise": "^4.2.6",
"validate": "^5.1.0", "validate": "^5.1.0",
"xml2js": "^0.4.23" "xml2js": "^0.6.2"
}, },
"_moduleAliases": { "engines": {
"@core": "core", "node": ">=22"
"@modules": "modules",
"@models": "models"
} }
} }