ЦИТК-255 - Добавление параметра времени ожидания отработки входящего сообщения

This commit is contained in:
Dollerino 2025-07-17 12:01:44 +03:00
parent 81719ada38
commit f75269ddc7
3 changed files with 60 additions and 18 deletions

View File

@ -70,7 +70,9 @@ let inComing = {
//Максимальный размер пула подключений к БД для обработчика входящих сообщений //Максимальный размер пула подключений к БД для обработчика входящих сообщений
nPoolMax: 10, nPoolMax: 10,
//Шаг инкремента подключений к БД в пуле обработчика входящих сообщений //Шаг инкремента подключений к БД в пуле обработчика входящих сообщений
nPoolIncrement: 0 nPoolIncrement: 0,
//Время ожидания отработки входящего сообщения (мс)
nTimeout: 120000
}; };
//Параметры подключения к Kafka //Параметры подключения к Kafka

View File

@ -93,6 +93,8 @@ class InQueue extends EventEmitter {
//Внешние подключения //Внешние подключения
this.kafkaConnections = []; this.kafkaConnections = [];
this.mqttConnections = []; this.mqttConnections = [];
//Время ожидания отработки входящего сообщения
this.nTimeout = prms.inComing.nTimeout;
} else { } else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
} }
@ -329,19 +331,25 @@ class InQueue extends EventEmitter {
} }
} }
} }
//Всё успешно - отдаём результат клиенту //Если мы еще не отдали ответ от сервера
if (bStopPropagation === false) { if (!prms.res.writableFinished) {
if (optionsResp.headers) prms.res.set(optionsResp.headers); //Всё успешно - отдаём результат клиенту
prms.res.status(optionsResp.statusCode || 200).send(blResp); if (bStopPropagation === false) {
if (optionsResp.headers) prms.res.set(optionsResp.headers);
prms.res.status(optionsResp.statusCode || 200).send(blResp);
}
//Фиксируем успех обработки - в протоколе работы сервиса
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
//Фиксируем успех обработки - в статусе сообщения
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
} else {
//Или расскажем об ошибке
throw new ServerError(SERR_WEB_SERVER, "Истекло время ожидания обработки входящего запроса.");
} }
//Фиксируем успех обработки - в протоколе работы сервиса
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
//Фиксируем успех обработки - в статусе сообщения
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
} catch (e) { } catch (e) {
//Тема и текст уведомления об ошибке //Тема и текст уведомления об ошибке
let sSubject = `Ошибка обработки входящего сообщения сервером приложений для функции "${prms.function.sCode}" сервиса "${prms.service.sCode}"`; let sSubject = `Ошибка обработки входящего сообщения сервером приложений для функции "${prms.function.sCode}" сервиса "${prms.service.sCode}"`;
@ -375,8 +383,11 @@ class InQueue extends EventEmitter {
sMessage sMessage
}); });
} }
//Отправим ошибку клиенту //Если мы еще не отдали ответ от сервера
prms.res.status(500).send(makeErrorText(e)); if (!prms.res.writableFinished) {
//Отправим ошибку клиенту
prms.res.status(500).send(makeErrorText(e));
}
} }
} else { } else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
@ -543,6 +554,18 @@ class InQueue extends EventEmitter {
if (req.headers["content-type"] === "false") req.headers["content-type"] = "application/octet-stream"; if (req.headers["content-type"] === "false") req.headers["content-type"] = "application/octet-stream";
next(); next();
}); });
//Конфигурируем сервер - устанавливаем таймаут обработки сообщений
this.webApp.use((req, res, next) => {
//Устанавливаем таймаут на ответ от сервера
res.setTimeout(this.nTimeout, () => {
//Формируем ошибку
let err = new Error("Истекло время ожидания формирования ответа для завершения текущего запроса.");
err.status = 504;
//Отправляем ошибку
next(err);
});
next();
});
//Конфигурируем сервер - обработка тела сообщения //Конфигурируем сервер - обработка тела сообщения
this.webApp.use(bodyParser.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" })); this.webApp.use(bodyParser.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" }));
//Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений //Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений
@ -585,8 +608,11 @@ class InQueue extends EventEmitter {
nServiceId: srvs.nId, nServiceId: srvs.nId,
nServiceFnId: fn.nId nServiceFnId: fn.nId
}); });
//Отправим ошибку клиенту //Если мы еще не отдали ответ от сервера
res.status(500).send(makeErrorText(e)); if (!res.writableFinished) {
//Отправим ошибку клиенту
res.status(500).send(makeErrorText(e));
}
} }
}); });
//...и собственный обработчик ошибок //...и собственный обработчик ошибок
@ -597,7 +623,7 @@ class InQueue extends EventEmitter {
nServiceFnId: fn.nId nServiceFnId: fn.nId
}); });
//Отправим ошибку клиенту //Отправим ошибку клиенту
res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message))); res.status(err.status || 500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)));
}); });
} }
); );

View File

@ -49,6 +49,9 @@ const validatePoolMaxInComing = val => val >= 1 && val <= 1000;
//Функция проверки значения шага инкремента подключений к БД в пуле обработчика входящих сообщений //Функция проверки значения шага инкремента подключений к БД в пуле обработчика входящих сообщений
const validatePoolIncrementInComing = val => val >= 0 && val <= 1000; const validatePoolIncrementInComing = val => val >= 0 && val <= 1000;
//Функция проверки значения времени ожидания отработки входящего сообщения для обработчика входящих сообщений
const validateTimeoutInComing = val => val >= 0;
//Схема валидации общих параметров сервера приложений //Схема валидации общих параметров сервера приложений
const common = new Schema({ const common = new Schema({
//Наименование сервера приложений //Наименование сервера приложений
@ -323,6 +326,17 @@ const inComing = new Schema({
validatePoolIncrementInComing: path => validatePoolIncrementInComing: path =>
`Значение шага инкремента подключений к БД в пуле обработчика входящих сообщений (${path}) должно быть целым числом в диапазоне от 0 до 1000` `Значение шага инкремента подключений к БД в пуле обработчика входящих сообщений (${path}) должно быть целым числом в диапазоне от 0 до 1000`
} }
},
//Время ожидания отработки входящего сообщения (мс)
nTimeout: {
type: Number,
required: true,
use: { validateTimeoutInComing },
message: {
type: path => `Время ожидания отработки входящего сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указано время ожидания отработки входящего сообщения (${path})`,
validateTimeoutInComing: path => `Время ожидания отработки входящего сообщения (${path}) должно быть неотрицательным целым числом`
}
} }
}); });