diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index bee715f..fa2621c 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -24,7 +24,8 @@ const { getMQTTConnectionSettings, getKafkaBroker, getKafkaAuth, - getURLProtocol + getURLProtocol, + wrapPromiseTimeout } = require("./utils"); //Вспомогательные функции const { ServerError } = require("./server_errors"); //Типовая ошибка const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений @@ -359,8 +360,12 @@ const appProcess = async prms => { default: //Установим флаг возврата полного ответа (и тела и заголовков) options.resolveWithFullResponse = true; + //Установим таймаут подключения + options.timeout = prms.function.nTimeoutConn ? prms.function.nTimeoutConn : null; //Отправляем запрос - serverResp = await rqp(options); + serverResp = prms.function.nTimeoutAsynch + ? await wrapPromiseTimeout(prms.function.nTimeoutAsynch, rqp(options)) + : await rqp(options); break; } //Сохраняем полученный ответ diff --git a/core/utils.js b/core/utils.js index 46ec904..9e19326 100644 --- a/core/utils.js +++ b/core/utils.js @@ -403,6 +403,22 @@ const getURLProtocol = sURL => { return sURL.substring(0, 1) === "/" ? SPROTOCOL_HTTP : new URL(sURL).protocol.slice(0, -1); }; +//Обёртывание промиса в таймаут исполнения +const wrapPromiseTimeout = (timeout, promise) => { + if (!timeout) return promise; + let timeoutPid; + const timeoutPromise = new Promise((resolve, reject) => { + const sMessage = `Истёк интервал ожидания (${timeout} мс) завершения асинхронного процесса.`; + let e = new Error(sMessage); + e.error = sMessage; + timeoutPid = setTimeout(() => reject(e), timeout); + }); + return Promise.race([promise, timeoutPromise]).finally(() => { + if (promise.promise().isPending()) promise.cancel(); + if (timeoutPid) clearTimeout(timeoutPid); + }); +}; + //----------------- // Интерфейс модуля //----------------- @@ -428,3 +444,4 @@ exports.getMQTTConnectionSettings = getMQTTConnectionSettings; exports.getKafkaBroker = getKafkaBroker; exports.getKafkaAuth = getKafkaAuth; exports.getURLProtocol = getURLProtocol; +exports.wrapPromiseTimeout = wrapPromiseTimeout; diff --git a/models/obj_service_function.js b/models/obj_service_function.js index a8e24c3..9f7ccdf 100644 --- a/models/obj_service_function.js +++ b/models/obj_service_function.js @@ -398,5 +398,21 @@ exports.ServiceFunction = new Schema({ validateErrNtfMail: path => `Неверный формат списка адресов E-Mail для оповещения об ошибке исполнения сообщения очереди для функции обработки (${path}), для указания нескольких адресов следует использовать запятую в качестве разделителя (без пробелов)` } + }, + //Таймаут сетевого подключения (мс) + nTimeoutConn: { + type: Number, + required: false, + message: { + type: path => `Таймаут сетевого подключения функции сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)` + } + }, + //Таймаут асинхронной отправки (мс) + nTimeoutAsynch: { + type: Number, + required: false, + message: { + type: path => `Таймаут асинхронной отправки функции сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)` + } } });