From 113ccca2b8c4204e00593fad1916cd2955951340 Mon Sep 17 00:00:00 2001 From: Dollerino Date: Wed, 19 Mar 2025 16:09:02 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A6=D0=98=D0=A2=D0=9A-936=20-=20=D0=94=D0=BE?= =?UTF-8?q?=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD=D0=B8=D0=B5=20=D1=82=D0=B0?= =?UTF-8?q?=D0=B9=D0=BC=D0=B0=D1=83=D1=82=D0=BE=D0=B2=20=D0=B4=D0=BB=D1=8F?= =?UTF-8?q?=20=D1=80=D0=B5=D0=BA=D0=B2=D0=B5=D1=81=D1=82=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/out_queue_processor.js | 9 +++++++-- core/utils.js | 17 +++++++++++++++++ models/obj_service_function.js | 16 ++++++++++++++++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index bee715f..fa2621c 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -24,7 +24,8 @@ const { getMQTTConnectionSettings, getKafkaBroker, getKafkaAuth, - getURLProtocol + getURLProtocol, + wrapPromiseTimeout } = require("./utils"); //Вспомогательные функции const { ServerError } = require("./server_errors"); //Типовая ошибка const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений @@ -359,8 +360,12 @@ const appProcess = async prms => { default: //Установим флаг возврата полного ответа (и тела и заголовков) options.resolveWithFullResponse = true; + //Установим таймаут подключения + options.timeout = prms.function.nTimeoutConn ? prms.function.nTimeoutConn : null; //Отправляем запрос - serverResp = await rqp(options); + serverResp = prms.function.nTimeoutAsynch + ? await wrapPromiseTimeout(prms.function.nTimeoutAsynch, rqp(options)) + : await rqp(options); break; } //Сохраняем полученный ответ diff --git a/core/utils.js b/core/utils.js index 46ec904..9e19326 100644 --- a/core/utils.js +++ b/core/utils.js @@ -403,6 +403,22 @@ const getURLProtocol = sURL => { return sURL.substring(0, 1) === "/" ? SPROTOCOL_HTTP : new URL(sURL).protocol.slice(0, -1); }; +//Обёртывание промиса в таймаут исполнения +const wrapPromiseTimeout = (timeout, promise) => { + if (!timeout) return promise; + let timeoutPid; + const timeoutPromise = new Promise((resolve, reject) => { + const sMessage = `Истёк интервал ожидания (${timeout} мс) завершения асинхронного процесса.`; + let e = new Error(sMessage); + e.error = sMessage; + timeoutPid = setTimeout(() => reject(e), timeout); + }); + return Promise.race([promise, timeoutPromise]).finally(() => { + if (promise.promise().isPending()) promise.cancel(); + if (timeoutPid) clearTimeout(timeoutPid); + }); +}; + //----------------- // Интерфейс модуля //----------------- @@ -428,3 +444,4 @@ exports.getMQTTConnectionSettings = getMQTTConnectionSettings; exports.getKafkaBroker = getKafkaBroker; exports.getKafkaAuth = getKafkaAuth; exports.getURLProtocol = getURLProtocol; +exports.wrapPromiseTimeout = wrapPromiseTimeout; diff --git a/models/obj_service_function.js b/models/obj_service_function.js index a8e24c3..9f7ccdf 100644 --- a/models/obj_service_function.js +++ b/models/obj_service_function.js @@ -398,5 +398,21 @@ exports.ServiceFunction = new Schema({ validateErrNtfMail: path => `Неверный формат списка адресов E-Mail для оповещения об ошибке исполнения сообщения очереди для функции обработки (${path}), для указания нескольких адресов следует использовать запятую в качестве разделителя (без пробелов)` } + }, + //Таймаут сетевого подключения (мс) + nTimeoutConn: { + type: Number, + required: false, + message: { + type: path => `Таймаут сетевого подключения функции сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)` + } + }, + //Таймаут асинхронной отправки (мс) + nTimeoutAsynch: { + type: Number, + required: false, + message: { + type: path => `Таймаут асинхронной отправки функции сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)` + } } });