ЦИТК-1030. Добавление возможности управления предельным временем исполнения процесса обработчика для исходящих сообщений

This commit is contained in:
boa604 2026-05-22 16:12:30 +03:00
parent e03f1bd503
commit af995d2765
5 changed files with 102 additions and 3 deletions

View File

@ -56,7 +56,9 @@ let outGoing = {
//Шаг инкремента подключений к БД в пуле обработчика исходящих сообщений //Шаг инкремента подключений к БД в пуле обработчика исходящих сообщений
nPoolIncrement: 0, nPoolIncrement: 0,
//Глобальный адрес прокси-сервера //Глобальный адрес прокси-сервера
sProxy: null sProxy: null,
//Таймаут параллельного процесса обработки исходящего сообщения (мс, если задан атрибут "nTimeoutWorker" функции сервиса обмена - игнорируется, 0 - не применять)
nTimeoutWorker: 0
}; };
//Параметры обработки очереди входящих сообщений //Параметры обработки очереди входящих сообщений

View File

@ -172,14 +172,57 @@ class OutQueue extends EventEmitter {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
} }
} }
//Сброс таймера принудительного останова параллельного процесса обработчика
clearWorkerTerminateTimeout(proc) {
if (proc?.terminateTimeoutPid) {
clearTimeout(proc.terminateTimeoutPid);
proc.terminateTimeoutPid = undefined;
}
}
//Определение таймаута параллельного процесса обработчика
resolveWorkerTimeoutMs(serviceFn) {
if (serviceFn?.nTimeoutWorker > 0) return serviceFn.nTimeoutWorker;
if (this.outGoing.nTimeoutWorker > 0) return this.outGoing.nTimeoutWorker;
return 0;
}
//Останов обработчика //Останов обработчика
async stopQueueProcessor(prms) { async stopQueueProcessor(prms) {
//Проверяем структуру переданного объекта для останова обработчика //Проверяем структуру переданного объекта для останова обработчика
let sCheckResult = validateObject(prms, prmsOutQueueSchema.stopQueueProcessor, "Параметры функции останова обработчика сообщения очереди"); let sCheckResult = validateObject(prms, prmsOutQueueSchema.stopQueueProcessor, "Параметры функции останова обработчика сообщения очереди");
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
//Сбросим таймер ожидания останова
if (prms.proc) this.clearWorkerTerminateTimeout(prms.proc);
//Удаляем идентификатор позиции очереди из списка обрабатываемых //Удаляем идентификатор позиции очереди из списка обрабатываемых
this.rmInProgress({ nQueueId: prms.nQueueId }); this.rmInProgress({ nQueueId: prms.nQueueId });
//Завершаем процесс обработчика, если он ещё активен
let killRes = true;
let killErr = false;
if (prms.proc?.connected) {
try {
killRes = prms.proc.kill();
} catch (e) {
killRes = false;
killErr = true;
//Отразим в протоколе ошибку останова
await this.logger.error(`Ошибка останова обработчика исходящего сообщения ${prms.nQueueId}: ${makeErrorText(e)}`, {
nQueueId: prms.nQueueId
});
}
}
if (prms.proc) {
if (!killRes && !killErr)
await this.logger.error(`Процесс обработчика исходящего сообщения ${prms.nQueueId} не был успешно завершен`, {
nQueueId: prms.nQueueId
});
else if (killRes) {
const terminateTimeoutFired = prms.proc.terminateTimeoutFired === true;
const message = `Процесс обработчика исходящего сообщения ${prms.nQueueId} завершен${terminateTimeoutFired ? " (по таймауту)" : ""}`;
const logData = { nQueueId: prms.nQueueId };
if (terminateTimeoutFired) await this.logger.warn(message, logData);
else await this.logger.info(message, logData);
}
}
//Сбрасываем признак "В работе" позиции очереди //Сбрасываем признак "В работе" позиции очереди
await this.dbConn.setInProgress({ await this.dbConn.setInProgress({
nQueueId: prms.nQueueId, nQueueId: prms.nQueueId,
@ -230,6 +273,11 @@ class OutQueue extends EventEmitter {
if (this.nWorkersLeft > 0) { if (this.nWorkersLeft > 0) {
//Переопределим себя для обращения внутри обработчиков событий //Переопределим себя для обращения внутри обработчиков событий
const self = this; const self = this;
//Найдем сервис и функцию обработки сообщения
const service = this.services.find(s => s.nId === prms.queue.nServiceId);
const serviceFn = service?.functions.find(f => f.nId === prms.queue.nServiceFnId);
//Таймаут параллельного процесса обработчика (мс)
const workerTimeout = this.resolveWorkerTimeoutMs(serviceFn);
//Запоминаем текущее количество попыток обработки //Запоминаем текущее количество попыток обработки
const nQueueOldExecCnt = prms.queue.nExecCnt; const nQueueOldExecCnt = prms.queue.nExecCnt;
//Буфер для ошибок (для журнала работы и очереди обмена) //Буфер для ошибок (для журнала работы и очереди обмена)
@ -239,6 +287,8 @@ class OutQueue extends EventEmitter {
const proc = ChildProcess.fork("core/out_queue_processor", { silent: false }); const proc = ChildProcess.fork("core/out_queue_processor", { silent: false });
//Перехват сообщений обработчика //Перехват сообщений обработчика
proc.on("message", async result => { proc.on("message", async result => {
//Сбросим таймер принудительного останова
self.clearWorkerTerminateTimeout(proc);
//Перечитывание не требуется, если выполнено успешно //Перечитывание не требуется, если выполнено успешно
if (result.sResult !== objOutQueueProcessorSchema.STASK_RESULT_OK) { if (result.sResult !== objOutQueueProcessorSchema.STASK_RESULT_OK) {
//Перечитываем запись очереди с учетом изменения статуса //Перечитываем запись очереди с учетом изменения статуса
@ -309,6 +359,8 @@ class OutQueue extends EventEmitter {
}); });
//Перехват ошибок обработчика //Перехват ошибок обработчика
proc.on("error", async e => { proc.on("error", async e => {
//Сбросим таймер принудительного останова
self.clearWorkerTerminateTimeout(proc);
//Считываем сообщение изменённое обработчиком //Считываем сообщение изменённое обработчиком
prms.queue = await self.dbConn.getQueue({ nQueueId: prms.queue.nId }); prms.queue = await self.dbConn.getQueue({ nQueueId: prms.queue.nId });
//Фиксируем ошибку в протоколе работы //Фиксируем ошибку в протоколе работы
@ -339,10 +391,10 @@ class OutQueue extends EventEmitter {
} }
}); });
//Перехват останова обработчика //Перехват останова обработчика
proc.on("exit", async code => { proc.on("exit", async () => {
try { try {
//Завершаем процесс обработки сообщения //Завершаем процесс обработки сообщения
this.stopQueueProcessor({ nQueueId: prms.queue.nId }); await this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc });
} catch (e) { } catch (e) {
//Отразим в протоколе ошибку останова //Отразим в протоколе ошибку останова
await self.logger.error(`Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`, { await self.logger.error(`Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`, {
@ -352,6 +404,20 @@ class OutQueue extends EventEmitter {
}); });
//Запускаем обработчик //Запускаем обработчик
this.startQueueProcessor({ queue: prms.queue, proc }); this.startQueueProcessor({ queue: prms.queue, proc });
//Принудительная остановка процесса обработчика по таймауту (мс)
if (workerTimeout > 0) {
proc.terminateTimeoutPid = setTimeout(async () => {
//Таймер уже сброшен — обработка завершилась штатно
if (!proc.terminateTimeoutPid) return;
proc.terminateTimeoutPid = undefined;
proc.terminateTimeoutFired = true;
await self.logger.warn(
`Истёк интервал ожидания (${workerTimeout} мс) завершения параллельного процесса обработки исходящего сообщения ${prms.queue.nId}`,
{ nQueueId: prms.queue.nId }
);
proc.emit("error", new Error(`Истёк интервал ожидания (${workerTimeout} мс) завершения параллельного процесса`));
}, workerTimeout);
}
} }
} else { } else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);

View File

@ -67,6 +67,9 @@ const validatePoolIncrementInComing = val => val >= 0 && val <= 1000;
//Функция проверки значения времени ожидания отработки входящего сообщения для обработчика входящих сообщений //Функция проверки значения времени ожидания отработки входящего сообщения для обработчика входящих сообщений
const validateTimeoutInComing = val => val >= 0; const validateTimeoutInComing = val => val >= 0;
//Функция проверки значения таймаута параллельного процесса обработчика исходящих сообщений
const validateTimeoutWorkerOutGoing = val => val >= 0 && Number.isInteger(val);
//Функция проверки значения времени ожидания успешного подключения Kafka //Функция проверки значения времени ожидания успешного подключения Kafka
const validateTimeoutKafka = val => val >= 0; const validateTimeoutKafka = val => val >= 0;
@ -293,6 +296,18 @@ const outGoing = new Schema({
type: path => `Адрес прокси-сервера приложения (${path}) имеет некорректный тип данных (ожидалось - String)`, type: path => `Адрес прокси-сервера приложения (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан глобальный адрес прокси-сервера (${path})` required: path => `Не указан глобальный адрес прокси-сервера (${path})`
} }
},
//Таймаут параллельного процесса обработки исходящего сообщения (мс)
nTimeoutWorker: {
type: Number,
required: false,
use: { validateTimeoutWorkerOutGoing },
message: {
type: path =>
`Таймаут параллельного процесса обработки исходящего сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`,
validateTimeoutWorkerOutGoing: path =>
`Таймаут параллельного процесса обработки исходящего сообщения (${path}) должен быть неотрицательным целым числом`
}
} }
}); });

View File

@ -414,5 +414,13 @@ exports.ServiceFunction = new Schema({
message: { message: {
type: path => `Таймаут асинхронной отправки функции сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)` type: path => `Таймаут асинхронной отправки функции сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)`
} }
},
//Таймаут параллельного процесса обработки исходящего сообщения (мс)
nTimeoutWorker: {
type: Number,
required: false,
message: {
type: path => `Таймаут параллельного процесса функции сервиса (${path}) имеет некорректный тип данных (ожидалось - Number)`
}
} }
}); });

View File

@ -145,6 +145,14 @@ exports.stopQueueProcessor = new Schema({
type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`, type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан идентификатор сообщения (${path})` required: path => `Не указан идентификатор сообщения (${path})`
} }
},
//Процесс обработчика
proc: {
use: { validateChildProcess },
required: false,
message: {
validateChildProcess: path => `Процесс обработчика (${path}) имеет некорректный тип данных (ожидалось - ChildProcess)`
}
} }
}); });