diff --git a/config.js b/config.js index 9843027..83345c5 100644 --- a/config.js +++ b/config.js @@ -14,7 +14,9 @@ let common = { //Релиз сервера приложений sRelease: "2023.10.10", //Таймаут останова сервера (мс) - nTerminateTimeout: 60000 + nTerminateTimeout: 60000, + //Контролировать версию Системы + bControlSystemVersion: false }; //Параметры подключения к БД @@ -46,7 +48,9 @@ let outGoing = { //Максимальный размер пула подключений к БД для обработчика исходящих сообщений nPoolMax: 4, //Шаг инкремента подключений к БД в пуле обработчика исходящих сообщений - nPoolIncrement: 0 + nPoolIncrement: 0, + //Глобальный адрес прокси-сервера + sProxy: null }; //Параметры обработки очереди входящих сообщений diff --git a/config_default.js b/config_default.js index a109c95..61a1f88 100644 --- a/config_default.js +++ b/config_default.js @@ -14,7 +14,9 @@ let common = { //Релиз сервера приложений sRelease: "2023.10.10", //Таймаут останова сервера (мс) - nTerminateTimeout: 60000 + nTerminateTimeout: 60000, + //Контролировать версию Системы + bControlSystemVersion: false }; //Параметры подключения к БД diff --git a/core/app.js b/core/app.js index 9b6311b..756e0cf 100644 --- a/core/app.js +++ b/core/app.js @@ -93,7 +93,7 @@ class ParusAppServer { async onOutQStarted() { //Сообщим, что запустили обработчик await this.logger.info("Обработчик очереди исходящих сообщений запущен"); - //Запускаем бслуживание очереди входящих + //Запускаем обслуживание очереди входящих await this.logger.info("Запуск обработчика очереди входящих сообщений..."); try { this.inQ.startProcessing({ services: this.services }); @@ -202,6 +202,8 @@ class ParusAppServer { this.dbConn = new db.DBConnector({ connectSettings: { ...prms.config.dbConnect, + sRelease: prms.config.common.sRelease, + bControlSystemVersion: prms.config.common.bControlSystemVersion, nPoolMin: prms.config.inComing.nPoolMin, nPoolMax: prms.config.inComing.nPoolMax, nPoolIncrement: prms.config.inComing.nPoolIncrement, @@ -215,7 +217,8 @@ class ParusAppServer { outGoing: prms.config.outGoing, dbConn: this.dbConn, logger: this.logger, - notifier: this.notifier + notifier: this.notifier, + sProxy: prms.config.outGoing.sProxy }); //Создаём обработчик очереди входящих this.inQ = new iq.InQueue({ @@ -229,7 +232,8 @@ class ParusAppServer { this.srvAvlCtrl = new sac.ServiceAvailableController({ logger: this.logger, notifier: this.notifier, - dbConn: this.dbConn + dbConn: this.dbConn, + sProxy: prms.config.outGoing.sProxy }); //Скажем что инициализировали await this.logger.info("Сервер приложений инициализирован"); diff --git a/core/in_queue.js b/core/in_queue.js index 15fd297..78c77b8 100644 --- a/core/in_queue.js +++ b/core/in_queue.js @@ -312,14 +312,14 @@ class InQueue extends EventEmitter { if (optionsResp.headers) prms.res.set(optionsResp.headers); prms.res.status(optionsResp.statusCode || 200).send(blResp); } + //Фиксируем успех обработки - в протоколе работы сервиса + await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId }); //Фиксируем успех обработки - в статусе сообщения q = await this.dbConn.setQueueState({ nQueueId: q.nId, nIncExecCnt: NINC_EXEC_CNT_YES, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK }); - //Фиксируем успех обработки - в протоколе работы сервиса - await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId }); } catch (e) { //Тема и текст уведомления об ошибке let sSubject = `Ошибка обработки входящего сообщения сервером приложений для функции "${prms.function.sCode}" сервиса "${prms.service.sCode}"`; diff --git a/core/out_queue.js b/core/out_queue.js index aef4c2f..1c60d8b 100644 --- a/core/out_queue.js +++ b/core/out_queue.js @@ -64,6 +64,8 @@ class OutQueue extends EventEmitter { this.logger = prms.logger; //Запомним уведомитель this.notifier = prms.notifier; + //Запомним глобальный адрес прокси-сервера + this.sProxy = prms.sProxy; //Список обрабатываемых в текущий момент сообщений очереди this.inProgress = []; //Привяжем методы к указателю на себя для использования в обработчиках событий @@ -160,7 +162,8 @@ class OutQueue extends EventEmitter { service: _.find(this.services, { nId: prms.queue.nServiceId }), function: _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, { nId: prms.queue.nServiceFnId - }) + }), + sProxy: this.sProxy }); //Уменьшаем количество доступных обработчиков this.nWorkersLeft--; @@ -244,8 +247,11 @@ class OutQueue extends EventEmitter { const proc = ChildProcess.fork("core/out_queue_processor", { silent: false }); //Перехват сообщений обработчика proc.on("message", async result => { - //Считываем сообщение изменённое обработчиком - prms.queue = await self.dbConn.getQueue({ nQueueId: prms.queue.nId }); + //Перечитывание не требуется, если выполнено успешно + if (result.sResult !== objOutQueueProcessorSchema.STASK_RESULT_OK) { + //Перечитываем запись очереди с учетом изменения статуса + prms.queue = await self.dbConn.getQueue({ nQueueId: prms.queue.nId }); + } //Проверяем структуру полученного сообщения let sCheckResult = validateObject( result, diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 0e4b7a0..05c09dc 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -145,6 +145,11 @@ const appProcess = async prms => { sQuery: prms.queue.blMsg === null ? "" : prms.queue.blMsg.toString() }); } + // Если у сервиса указан прокси, либо у приложения установлен глобальный прокси + if (prms.service.sProxyURL || prms.sProxy) { + // Добавляем прокси с приоритетом сервиса + options.proxy = prms.service.sProxyURL ?? prms.sProxy; + } //Дополним получившиеся параметры переданными в сообщении if (prms.queue.sOptions) { try { @@ -430,16 +435,16 @@ const dbProcess = async prms => { //Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации"); } + //Фиксируем успешное исполнение сервером БД - в протоколе работы сервиса + await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером БД`, { + nQueueId: prms.queue.nId + }); //Фиксируем успешное исполнение (полное - дальше обработки нет) - в статусе сообщения res = await dbConn.setQueueState({ nQueueId: prms.queue.nId, nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK }); - //Фиксируем успешное исполнение сервером БД - в протоколе работы сервиса - await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером БД`, { - nQueueId: prms.queue.nId - }); } catch (e) { //Если была ошибка аутентификации - возвращаем на повторную обработку сервером приложений if (e instanceof ServerError && e.sCode == SERR_UNAUTH) { @@ -509,7 +514,8 @@ const processTask = async prms => { let res = await appProcess({ queue: q, service: prms.task.service, - function: prms.task.function + function: prms.task.function, + sProxy: prms.task.sProxy }); //Если результат обработки ошибка - пробрасываем её дальше if (res instanceof ServerError) { diff --git a/core/service_available_controller.js b/core/service_available_controller.js index 1f241d2..a55d1c0 100644 --- a/core/service_available_controller.js +++ b/core/service_available_controller.js @@ -67,6 +67,8 @@ class ServiceAvailableController extends EventEmitter { this.logger = prms.logger; //Запомним подключение к БД this.dbConn = prms.dbConn; + //Запомним глобальный адрес прокси-сервера + this.sProxy = prms.sProxy; //Привяжем методы к указателю на себя для использования в обработчиках событий this.serviceDetectingLoop = this.serviceDetectingLoop.bind(this); } else { @@ -110,8 +112,18 @@ class ServiceAvailableController extends EventEmitter { service.nSrvType == objServiceSchema.NSRV_TYPE_SEND ) { try { + // Инициализируем параметры запроса + let options = {}; + // Устанавливаем параметры запроса + options.url = service.sSrvRoot; + options.timeout = NNETWORK_CHECK_TIMEOUT; + // Если у сервиса указан прокси, либо у приложения установлен глобальный прокси + if (service.sProxyURL || this.sProxy) { + // Добавляем прокси с приоритетом сервиса + options.proxy = service.sProxyURL ?? this.sProxy; + } //Отправляем проверочный запрос - await rqp({ url: service.sSrvRoot, timeout: NNETWORK_CHECK_TIMEOUT }); + await rqp(options); //Запрос прошел - фиксируем дату доступности и сбрасываем дату недоступности service.dAvailable = new Date(); service.dUnAvailable = null; diff --git a/models/obj_config.js b/models/obj_config.js index 96064c9..63fa321 100644 --- a/models/obj_config.js +++ b/models/obj_config.js @@ -77,6 +77,16 @@ const common = new Schema({ validateTerminateTimeout: path => `Таймаут останова сервера (${path}) должен быть целым числом в диапазоне от 1000 до 120000` } + }, + //Контроль версии Системы + bControlSystemVersion: { + type: Boolean, + required: true, + message: { + type: path => + `Признак контроля версии Системы (${path}) имеет некорректный тип данных (ожидалось - Boolean)`, + required: path => `Не указан признак контроля версии Системы (${path})` + } } }); @@ -220,6 +230,17 @@ const outGoing = new Schema({ validatePoolIncrementOutGoing: path => `Значение шага инкремента подключений к БД в пуле обработчика исходящих сообщений (${path}) должно быть целым числом в диапазоне от 0 до 10` } + }, + //Глобальный адрес прокси-сервера + sProxy: { + type: String, + required: false, + message: { + type: path => + `Адрес прокси-сервера приложения (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => + `Не указан глобальный адрес прокси-сервера (${path})` + } } }); diff --git a/models/obj_out_queue_processor.js b/models/obj_out_queue_processor.js index bcabc16..43e5c05 100644 --- a/models/obj_out_queue_processor.js +++ b/models/obj_out_queue_processor.js @@ -65,6 +65,15 @@ exports.OutQueueProcessorTask = new Schema({ message: { required: path => `Не указана функция сервиса для обработки сообщения очереди (${path})` } + }, + //Глобальный адрес прокси-сервера + sProxy: { + type: String, + required: false, + message: { + type: path => `Глобальный адрес прокси-сервера (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан глобальный адрес прокси-сервера (${path})` + } } }); diff --git a/models/obj_service.js b/models/obj_service.js index f151b88..5d86db2 100644 --- a/models/obj_service.js +++ b/models/obj_service.js @@ -189,6 +189,16 @@ exports.Service = new Schema({ `Неверный формат списка адресов E-Mail для оповещения о простое внешнего сервиса (${path}), для указания нескольких адресов следует использовать запятую в качестве разделителя (без пробелов)` } }, + //Адрес прокси-сервера + sProxyURL: { + type: String, + required: false, + message: { + type: path => + `Адрес прокси-сервера в очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан адрес прокси-сервера в очереди обмена (${path})` + } + }, //Список функций сервиса functions: defServiceFunctions(true, "functions") }); diff --git a/models/prms_out_queue.js b/models/prms_out_queue.js index 734919c..4d90322 100644 --- a/models/prms_out_queue.js +++ b/models/prms_out_queue.js @@ -66,6 +66,15 @@ exports.OutQueue = new Schema({ `Объект для рассылки уведомлений (${path}) имеет некорректный тип данных (ожидалось - Notifier)`, required: path => `Не указан объект для рассылки уведомлений (${path})` } + }, + //Глобальный адрес прокси-сервера + sProxy: { + type: String, + required: false, + message: { + type: path => `Глобальный адрес прокси-сервера (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан глобальный адрес прокси-сервера (${path})` + } } }); diff --git a/models/prms_out_queue_processor.js b/models/prms_out_queue_processor.js index 39c445d..ed947b5 100644 --- a/models/prms_out_queue_processor.js +++ b/models/prms_out_queue_processor.js @@ -37,7 +37,7 @@ exports.appProcess = new Schema({ schema: Queue, required: true, message: { - required: path => `Не указано обрабатываемое сообщение очреди (${path})` + required: path => `Не указано обрабатываемое сообщение очереди (${path})` } }, //Cервис-обработчик @@ -55,6 +55,15 @@ exports.appProcess = new Schema({ message: { required: path => `Не указана функция сервиса для обработки сообщения очереди (${path})` } + }, + //Глобальный адрес прокси-сервера + sProxy: { + type: String, + required: false, + message: { + type: path => `Глобальный адрес прокси-сервера (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан глобальный адрес прокси-сервера (${path})` + } } }); diff --git a/models/prms_service_available_controller.js b/models/prms_service_available_controller.js index b6f26df..65a525b 100644 --- a/models/prms_service_available_controller.js +++ b/models/prms_service_available_controller.js @@ -48,6 +48,16 @@ exports.ServiceAvailableController = new Schema({ `Объект для взаимодействия с БД (${path}) имеет некорректный тип данных (ожидалось - DBConnector)`, required: path => `Не указан объект для взаимодействия с БД (${path})` } + }, + //Глобальный адрес прокси-сервера + sProxy: { + type: String, + required: false, + message: { + type: path => + `Глобальный адрес прокси-сервера (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан глобальный адрес прокси-сервера (${path})` + } } }); diff --git a/modules/parus_oracle_db.js b/modules/parus_oracle_db.js index 54a46b8..f032aa3 100644 --- a/modules/parus_oracle_db.js +++ b/modules/parus_oracle_db.js @@ -70,6 +70,31 @@ const checkWorkers = async prms => { } }; +//Проверка соответствия релизов сервера приложений и системы +const checkRelease = async prms => { + let pooledConnection; + try { + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute("BEGIN :DB_RELEASE := PKG_EXS.UTL_PRODUCT_RELEASE_GET(); END;", { + DB_RELEASE: { dir: oracledb.BIND_OUT, type: oracledb.DB_TYPE_VARCHAR } + }); + let sDB_RELEASE = res.outBinds.DB_RELEASE; + if (sDB_RELEASE !== prms.sRelease) { + throw new Error(`Версия сервера приложений (${prms.sRelease}) не соответствует версии системы (${sDB_RELEASE}).`); + } + } catch (e) { + throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } + } +} + //Подключение к БД const connect = async prms => { try { @@ -93,6 +118,9 @@ const connect = async prms => { ); } }); + if (prms.bControlSystemVersion) { + await checkRelease({ sRelease: prms.sRelease, connection: pool }); + } await checkWorkers({ nMaxWorkers: prms.nMaxWorkers, connection: pool }); return pool; } catch (e) {