From f75269ddc7a5ba75bb6788d7d8e7be927c88d549 Mon Sep 17 00:00:00 2001 From: Dollerino Date: Thu, 17 Jul 2025 12:01:44 +0300 Subject: [PATCH 1/2] =?UTF-8?q?=D0=A6=D0=98=D0=A2=D0=9A-255=20-=20=D0=94?= =?UTF-8?q?=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD=D0=B8=D0=B5=20=D0=BF?= =?UTF-8?q?=D0=B0=D1=80=D0=B0=D0=BC=D0=B5=D1=82=D1=80=D0=B0=20=D0=B2=D1=80?= =?UTF-8?q?=D0=B5=D0=BC=D0=B5=D0=BD=D0=B8=20=D0=BE=D0=B6=D0=B8=D0=B4=D0=B0?= =?UTF-8?q?=D0=BD=D0=B8=D1=8F=20=D0=BE=D1=82=D1=80=D0=B0=D0=B1=D0=BE=D1=82?= =?UTF-8?q?=D0=BA=D0=B8=20=D0=B2=D1=85=D0=BE=D0=B4=D1=8F=D1=89=D0=B5=D0=B3?= =?UTF-8?q?=D0=BE=20=D1=81=D0=BE=D0=BE=D0=B1=D1=89=D0=B5=D0=BD=D0=B8=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.js | 4 ++- core/in_queue.js | 60 +++++++++++++++++++++++++++++++------------- models/obj_config.js | 14 +++++++++++ 3 files changed, 60 insertions(+), 18 deletions(-) diff --git a/config.js b/config.js index 1207399..b744df7 100644 --- a/config.js +++ b/config.js @@ -70,7 +70,9 @@ let inComing = { //Максимальный размер пула подключений к БД для обработчика входящих сообщений nPoolMax: 10, //Шаг инкремента подключений к БД в пуле обработчика входящих сообщений - nPoolIncrement: 0 + nPoolIncrement: 0, + //Время ожидания отработки входящего сообщения (мс) + nTimeout: 120000 }; //Параметры подключения к Kafka diff --git a/core/in_queue.js b/core/in_queue.js index a6ab9c9..bf873fa 100644 --- a/core/in_queue.js +++ b/core/in_queue.js @@ -93,6 +93,8 @@ class InQueue extends EventEmitter { //Внешние подключения this.kafkaConnections = []; this.mqttConnections = []; + //Время ожидания отработки входящего сообщения + this.nTimeout = prms.inComing.nTimeout; } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } @@ -329,19 +331,25 @@ class InQueue extends EventEmitter { } } } - //Всё успешно - отдаём результат клиенту - if (bStopPropagation === false) { - if (optionsResp.headers) prms.res.set(optionsResp.headers); - prms.res.status(optionsResp.statusCode || 200).send(blResp); + //Если мы еще не отдали ответ от сервера + if (!prms.res.writableFinished) { + //Всё успешно - отдаём результат клиенту + if (bStopPropagation === false) { + if (optionsResp.headers) prms.res.set(optionsResp.headers); + prms.res.status(optionsResp.statusCode || 200).send(blResp); + } + //Фиксируем успех обработки - в протоколе работы сервиса + await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId }); + //Фиксируем успех обработки - в статусе сообщения + q = await this.dbConn.setQueueState({ + nQueueId: q.nId, + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK + }); + } else { + //Или расскажем об ошибке + throw new ServerError(SERR_WEB_SERVER, "Истекло время ожидания обработки входящего запроса."); } - //Фиксируем успех обработки - в протоколе работы сервиса - await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId }); - //Фиксируем успех обработки - в статусе сообщения - q = await this.dbConn.setQueueState({ - nQueueId: q.nId, - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK - }); } catch (e) { //Тема и текст уведомления об ошибке let sSubject = `Ошибка обработки входящего сообщения сервером приложений для функции "${prms.function.sCode}" сервиса "${prms.service.sCode}"`; @@ -375,8 +383,11 @@ class InQueue extends EventEmitter { sMessage }); } - //Отправим ошибку клиенту - prms.res.status(500).send(makeErrorText(e)); + //Если мы еще не отдали ответ от сервера + if (!prms.res.writableFinished) { + //Отправим ошибку клиенту + prms.res.status(500).send(makeErrorText(e)); + } } } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); @@ -543,6 +554,18 @@ class InQueue extends EventEmitter { if (req.headers["content-type"] === "false") req.headers["content-type"] = "application/octet-stream"; next(); }); + //Конфигурируем сервер - устанавливаем таймаут обработки сообщений + this.webApp.use((req, res, next) => { + //Устанавливаем таймаут на ответ от сервера + res.setTimeout(this.nTimeout, () => { + //Формируем ошибку + let err = new Error("Истекло время ожидания формирования ответа для завершения текущего запроса."); + err.status = 504; + //Отправляем ошибку + next(err); + }); + next(); + }); //Конфигурируем сервер - обработка тела сообщения this.webApp.use(bodyParser.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" })); //Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений @@ -585,8 +608,11 @@ class InQueue extends EventEmitter { nServiceId: srvs.nId, nServiceFnId: fn.nId }); - //Отправим ошибку клиенту - res.status(500).send(makeErrorText(e)); + //Если мы еще не отдали ответ от сервера + if (!res.writableFinished) { + //Отправим ошибку клиенту + res.status(500).send(makeErrorText(e)); + } } }); //...и собственный обработчик ошибок @@ -597,7 +623,7 @@ class InQueue extends EventEmitter { nServiceFnId: fn.nId }); //Отправим ошибку клиенту - res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message))); + res.status(err.status || 500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message))); }); } ); diff --git a/models/obj_config.js b/models/obj_config.js index 563dd5a..f17d7d4 100644 --- a/models/obj_config.js +++ b/models/obj_config.js @@ -49,6 +49,9 @@ const validatePoolMaxInComing = val => val >= 1 && val <= 1000; //Функция проверки значения шага инкремента подключений к БД в пуле обработчика входящих сообщений const validatePoolIncrementInComing = val => val >= 0 && val <= 1000; +//Функция проверки значения времени ожидания отработки входящего сообщения для обработчика входящих сообщений +const validateTimeoutInComing = val => val >= 0; + //Схема валидации общих параметров сервера приложений const common = new Schema({ //Наименование сервера приложений @@ -323,6 +326,17 @@ const inComing = new Schema({ validatePoolIncrementInComing: path => `Значение шага инкремента подключений к БД в пуле обработчика входящих сообщений (${path}) должно быть целым числом в диапазоне от 0 до 1000` } + }, + //Время ожидания отработки входящего сообщения (мс) + nTimeout: { + type: Number, + required: true, + use: { validateTimeoutInComing }, + message: { + type: path => `Время ожидания отработки входящего сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указано время ожидания отработки входящего сообщения (${path})`, + validateTimeoutInComing: path => `Время ожидания отработки входящего сообщения (${path}) должно быть неотрицательным целым числом` + } } }); -- 2.34.1 From 061b7f2fad8fe15e26f5e65d7cd4f9755e0d5304 Mon Sep 17 00:00:00 2001 From: Dollerino Date: Fri, 18 Jul 2025 17:16:43 +0300 Subject: [PATCH 2/2] =?UTF-8?q?=D0=A6=D0=98=D0=A2=D0=9A-255=20-=20=D0=94?= =?UTF-8?q?=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD=D0=B8=D0=B5=20=D0=BF?= =?UTF-8?q?=D0=B0=D1=80=D0=B0=D0=BC=D0=B5=D1=82=D1=80=D0=B0=20=D0=B2=D1=80?= =?UTF-8?q?=D0=B5=D0=BC=D0=B5=D0=BD=D0=B8=20=D0=BE=D0=B6=D0=B8=D0=B4=D0=B0?= =?UTF-8?q?=D0=BD=D0=B8=D1=8F=20=D0=BE=D1=82=D1=80=D0=B0=D0=B1=D0=BE=D1=82?= =?UTF-8?q?=D0=BA=D0=B8=20=D0=B2=D1=85=D0=BE=D0=B4=D1=8F=D1=89=D0=B5=D0=B3?= =?UTF-8?q?=D0=BE=20=D1=81=D0=BE=D0=BE=D0=B1=D1=89=D0=B5=D0=BD=D0=B8=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.js | 4 ++-- core/in_queue.js | 32 ++++++++++++++++++-------------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/config.js b/config.js index b744df7..9b6511a 100644 --- a/config.js +++ b/config.js @@ -71,8 +71,8 @@ let inComing = { nPoolMax: 10, //Шаг инкремента подключений к БД в пуле обработчика входящих сообщений nPoolIncrement: 0, - //Время ожидания отработки входящего сообщения (мс) - nTimeout: 120000 + //Время ожидания отработки входящего сообщения (мс, 0 - не применять (не отменяет таймаут по умолчанию, который может быть установлен платформой)) + nTimeout: 0 }; //Параметры подключения к Kafka diff --git a/core/in_queue.js b/core/in_queue.js index bf873fa..cee141a 100644 --- a/core/in_queue.js +++ b/core/in_queue.js @@ -93,8 +93,6 @@ class InQueue extends EventEmitter { //Внешние подключения this.kafkaConnections = []; this.mqttConnections = []; - //Время ожидания отработки входящего сообщения - this.nTimeout = prms.inComing.nTimeout; } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } @@ -348,7 +346,10 @@ class InQueue extends EventEmitter { }); } else { //Или расскажем об ошибке - throw new ServerError(SERR_WEB_SERVER, "Истекло время ожидания обработки входящего запроса."); + throw new ServerError( + SERR_WEB_SERVER, + "Истекло время ожидания обработки входящего запроса. Канал закрыт. Клиенту был отправлен ответ." + ); } } catch (e) { //Тема и текст уведомления об ошибке @@ -554,18 +555,21 @@ class InQueue extends EventEmitter { if (req.headers["content-type"] === "false") req.headers["content-type"] = "application/octet-stream"; next(); }); - //Конфигурируем сервер - устанавливаем таймаут обработки сообщений - this.webApp.use((req, res, next) => { - //Устанавливаем таймаут на ответ от сервера - res.setTimeout(this.nTimeout, () => { - //Формируем ошибку - let err = new Error("Истекло время ожидания формирования ответа для завершения текущего запроса."); - err.status = 504; - //Отправляем ошибку - next(err); + //Если требуется установить таймаут на обработку сообщений + if (this.inComing.nTimeout !== 0) { + //Конфигурируем сервер - устанавливаем таймаут обработки сообщений + this.webApp.use((req, res, next) => { + //Устанавливаем таймаут на ответ от сервера + res.setTimeout(this.inComing.nTimeout, () => { + //Формируем ошибку + let err = new Error("Истекло время ожидания формирования ответа для завершения текущего запроса."); + err.status = 504; + //Отправляем ошибку + next(err); + }); + next(); }); - next(); - }); + } //Конфигурируем сервер - обработка тела сообщения this.webApp.use(bodyParser.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" })); //Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений -- 2.34.1