diff --git a/config.js b/config.js index e4190e3..be2fb74 100644 --- a/config.js +++ b/config.js @@ -56,7 +56,9 @@ let outGoing = { //Шаг инкремента подключений к БД в пуле обработчика исходящих сообщений nPoolIncrement: 0, //Глобальный адрес прокси-сервера - sProxy: null + sProxy: null, + //Таймаут параллельного процесса обработки исходящего сообщения (мс, если задан атрибут "nTimeoutWorker" функции сервиса обмена - игнорируется, 0 - не применять) + nTimeoutWorker: 0 }; //Параметры обработки очереди входящих сообщений diff --git a/core/out_queue.js b/core/out_queue.js index 3d7e8fd..380474b 100644 --- a/core/out_queue.js +++ b/core/out_queue.js @@ -172,14 +172,57 @@ class OutQueue extends EventEmitter { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } + //Сброс таймера принудительного останова параллельного процесса обработчика + clearWorkerTerminateTimeout(proc) { + if (proc?.terminateTimeoutPid) { + clearTimeout(proc.terminateTimeoutPid); + proc.terminateTimeoutPid = undefined; + } + } + //Определение таймаута параллельного процесса обработчика + resolveWorkerTimeoutMs(serviceFn) { + if (serviceFn?.nTimeoutWorker > 0) return serviceFn.nTimeoutWorker; + if (this.outGoing.nTimeoutWorker > 0) return this.outGoing.nTimeoutWorker; + return 0; + } //Останов обработчика async stopQueueProcessor(prms) { //Проверяем структуру переданного объекта для останова обработчика let sCheckResult = validateObject(prms, prmsOutQueueSchema.stopQueueProcessor, "Параметры функции останова обработчика сообщения очереди"); //Если структура объекта в норме if (!sCheckResult) { + //Сбросим таймер ожидания останова + if (prms.proc) this.clearWorkerTerminateTimeout(prms.proc); //Удаляем идентификатор позиции очереди из списка обрабатываемых this.rmInProgress({ nQueueId: prms.nQueueId }); + //Завершаем процесс обработчика, если он ещё активен + let killRes = true; + let killErr = false; + if (prms.proc?.connected) { + try { + killRes = prms.proc.kill(); + } catch (e) { + killRes = false; + killErr = true; + //Отразим в протоколе ошибку останова + await this.logger.error(`Ошибка останова обработчика исходящего сообщения ${prms.nQueueId}: ${makeErrorText(e)}`, { + nQueueId: prms.nQueueId + }); + } + } + if (prms.proc) { + if (!killRes && !killErr) + await this.logger.error(`Процесс обработчика исходящего сообщения ${prms.nQueueId} не был успешно завершен`, { + nQueueId: prms.nQueueId + }); + else if (killRes) { + const terminateTimeoutFired = prms.proc.terminateTimeoutFired === true; + const message = `Процесс обработчика исходящего сообщения ${prms.nQueueId} завершен${terminateTimeoutFired ? " (по таймауту)" : ""}`; + const logData = { nQueueId: prms.nQueueId }; + if (terminateTimeoutFired) await this.logger.warn(message, logData); + else await this.logger.info(message, logData); + } + } //Сбрасываем признак "В работе" позиции очереди await this.dbConn.setInProgress({ nQueueId: prms.nQueueId, @@ -230,6 +273,11 @@ class OutQueue extends EventEmitter { if (this.nWorkersLeft > 0) { //Переопределим себя для обращения внутри обработчиков событий const self = this; + //Найдем сервис и функцию обработки сообщения + const service = this.services.find(s => s.nId === prms.queue.nServiceId); + const serviceFn = service?.functions.find(f => f.nId === prms.queue.nServiceFnId); + //Таймаут параллельного процесса обработчика (мс) + const workerTimeout = this.resolveWorkerTimeoutMs(serviceFn); //Запоминаем текущее количество попыток обработки const nQueueOldExecCnt = prms.queue.nExecCnt; //Буфер для ошибок (для журнала работы и очереди обмена) @@ -239,6 +287,8 @@ class OutQueue extends EventEmitter { const proc = ChildProcess.fork("core/out_queue_processor", { silent: false }); //Перехват сообщений обработчика proc.on("message", async result => { + //Сбросим таймер принудительного останова + self.clearWorkerTerminateTimeout(proc); //Перечитывание не требуется, если выполнено успешно if (result.sResult !== objOutQueueProcessorSchema.STASK_RESULT_OK) { //Перечитываем запись очереди с учетом изменения статуса @@ -309,6 +359,8 @@ class OutQueue extends EventEmitter { }); //Перехват ошибок обработчика proc.on("error", async e => { + //Сбросим таймер принудительного останова + self.clearWorkerTerminateTimeout(proc); //Считываем сообщение изменённое обработчиком prms.queue = await self.dbConn.getQueue({ nQueueId: prms.queue.nId }); //Фиксируем ошибку в протоколе работы @@ -339,10 +391,10 @@ class OutQueue extends EventEmitter { } }); //Перехват останова обработчика - proc.on("exit", async code => { + proc.on("exit", async () => { try { //Завершаем процесс обработки сообщения - this.stopQueueProcessor({ nQueueId: prms.queue.nId }); + await this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc }); } catch (e) { //Отразим в протоколе ошибку останова await self.logger.error(`Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`, { @@ -352,6 +404,20 @@ class OutQueue extends EventEmitter { }); //Запускаем обработчик this.startQueueProcessor({ queue: prms.queue, proc }); + //Принудительная остановка процесса обработчика по таймауту (мс) + if (workerTimeout > 0) { + proc.terminateTimeoutPid = setTimeout(async () => { + //Таймер уже сброшен — обработка завершилась штатно + if (!proc.terminateTimeoutPid) return; + proc.terminateTimeoutPid = undefined; + proc.terminateTimeoutFired = true; + await self.logger.warn( + `Истёк интервал ожидания (${workerTimeout} мс) завершения параллельного процесса обработки исходящего сообщения ${prms.queue.nId}`, + { nQueueId: prms.queue.nId } + ); + proc.emit("error", new Error(`Истёк интервал ожидания (${workerTimeout} мс) завершения параллельного процесса`)); + }, workerTimeout); + } } } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); diff --git a/models/obj_config.js b/models/obj_config.js index 05eba66..b18a3b1 100644 --- a/models/obj_config.js +++ b/models/obj_config.js @@ -67,6 +67,9 @@ const validatePoolIncrementInComing = val => val >= 0 && val <= 1000; //Функция проверки значения времени ожидания отработки входящего сообщения для обработчика входящих сообщений const validateTimeoutInComing = val => val >= 0; +//Функция проверки значения таймаута параллельного процесса обработчика исходящих сообщений +const validateTimeoutWorkerOutGoing = val => val >= 0 && Number.isInteger(val); + //Функция проверки значения времени ожидания успешного подключения Kafka const validateTimeoutKafka = val => val >= 0; @@ -293,6 +296,18 @@ const outGoing = new Schema({ type: path => `Адрес прокси-сервера приложения (${path}) имеет некорректный тип данных (ожидалось - String)`, required: path => `Не указан глобальный адрес прокси-сервера (${path})` } + }, + //Таймаут параллельного процесса обработки исходящего сообщения (мс) + nTimeoutWorker: { + type: Number, + required: false, + use: { validateTimeoutWorkerOutGoing }, + message: { + type: path => + `Таймаут параллельного процесса обработки исходящего сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`, + validateTimeoutWorkerOutGoing: path => + `Таймаут параллельного процесса обработки исходящего сообщения (${path}) должен быть неотрицательным целым числом` + } } }); diff --git a/models/obj_service_function.js b/models/obj_service_function.js index 9f7ccdf..fd4bb91 100644 --- a/models/obj_service_function.js +++ b/models/obj_service_function.js @@ -414,5 +414,13 @@ exports.ServiceFunction = new Schema({ message: { type: path => `Таймаут асинхронной отправки функции сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)` } + }, + //Таймаут параллельного процесса обработки исходящего сообщения (мс) + nTimeoutWorker: { + type: Number, + required: false, + message: { + type: path => `Таймаут параллельного процесса функции сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)` + } } }); diff --git a/models/prms_out_queue.js b/models/prms_out_queue.js index 690e8f0..90649fe 100644 --- a/models/prms_out_queue.js +++ b/models/prms_out_queue.js @@ -145,6 +145,14 @@ exports.stopQueueProcessor = new Schema({ type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`, required: path => `Не указан идентификатор сообщения (${path})` } + }, + //Процесс обработчика + proc: { + use: { validateChildProcess }, + required: false, + message: { + validateChildProcess: path => `Процесс обработчика (${path}) имеет некорректный тип данных (ожидалось - ChildProcess)` + } } });