ЦИТК-1029 - Добавление признака "В работе" для очереди обмена

This commit is contained in:
Dollerino 2026-03-30 15:03:02 +03:00
parent 380df706d8
commit 92f23914af
7 changed files with 133 additions and 67 deletions

View File

@ -540,6 +540,35 @@ class DBConnector extends EventEmitter {
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
}
}
//Установить признак "В работе" позиции очереди
async setInProgress(prms) {
if (this.bConnected) {
//Проверяем структуру переданных параметров
let sCheckResult = validateObject(
prms,
prmsDBConnectorSchema.setInProgress,
`Параметры функции установки признака "В работе" позиции очереди`
);
//Если структура объекта в норме
if (!sCheckResult) {
//Подготовим параметры
let setInProgressData = deepClone(prms);
setInProgressData.connection = this.connection;
try {
//Исполняем действие в БД
await this.connector.setInProgress(setInProgressData);
//Успешно - возвращаем ничего
return;
} catch (e) {
throw new ServerError(SERR_DB_EXECUTE, e.message);
}
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
} else {
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
}
}
//Считать очередную порцию исходящих сообщений
async getOutgoing(prms) {
if (this.bConnected) {

View File

@ -173,15 +173,18 @@ class OutQueue extends EventEmitter {
}
}
//Останов обработчика
stopQueueProcessor(prms) {
async stopQueueProcessor(prms) {
//Проверяем структуру переданного объекта для останова обработчика
let sCheckResult = validateObject(prms, prmsOutQueueSchema.stopQueueProcessor, "Параметры функции останова обработчика сообщения очереди");
//Если структура объекта в норме
if (!sCheckResult) {
//Удаляем идентификатор позиции очереди из списка обрабатываемых
this.rmInProgress({ nQueueId: prms.nQueueId });
//Завершаем дочерний процесс обработчика
prms.proc.kill();
//Сбрасываем признак "В работе" позиции очереди
await this.dbConn.setInProgress({
nQueueId: prms.nQueueId,
nInProgress: objQueueSchema.NQUEUE_IN_PROGRESS_NO
});
//Увеличиваем количество доступных обработчиков
this.nWorkersLeft++;
} else {
@ -295,7 +298,8 @@ class OutQueue extends EventEmitter {
if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR) await this.notifyMessageProcessError(prms);
//Останавливаем обработчик и инкрементируем флаг их доступного количества
try {
this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc });
//Завершаем дочерний процесс обработчика
proc.kill();
} catch (e) {
//Отразим в протоколе ошибку останова
await self.logger.error(`Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`, {
@ -325,7 +329,8 @@ class OutQueue extends EventEmitter {
if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR) await this.notifyMessageProcessError(prms);
//Останавливаем обработчик и инкрементируем флаг их доступного количества
try {
this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc });
//Завершаем дочерний процесс обработчика
proc.kill();
} catch (e) {
//Отразим в протоколе ошибку останова
await self.logger.error(`Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`, {
@ -334,7 +339,17 @@ class OutQueue extends EventEmitter {
}
});
//Перехват останова обработчика
proc.on("exit", code => {});
proc.on("exit", async code => {
try {
//Завершаем процесс обработки сообщения
this.stopQueueProcessor({ nQueueId: prms.queue.nId });
} catch (e) {
//Отразим в протоколе ошибку останова
await self.logger.error(`Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`, {
nQueueId: prms.queue.nId
});
}
});
//Запускаем обработчик
this.startQueueProcessor({ queue: prms.queue, proc });
}

View File

@ -33,6 +33,10 @@ const SQUEUE_EXEC_STATE_DB_ERR = "DB_ERR"; //Ошибка обработки С
const SQUEUE_EXEC_STATE_OK = "OK"; //Обработано успешно (строковый код)
const SQUEUE_EXEC_STATE_ERR = "ERR"; //Обработано с ошибками (строковый код)
//Значения признака "В работе" сообщения очереди обмена
const NQUEUE_IN_PROGRESS_NO = 0; //Не в работе
const NQUEUE_IN_PROGRESS_YES = 1; //В работе
//Коды результатов исполнения обработчика сообщения
const SPRC_RESP_RESULT_OK = "OK"; //Обработано успешно
const SPRC_RESP_RESULT_ERR = "ERR"; //Ошибка обработки
@ -70,6 +74,8 @@ exports.SPRC_RESP_RESULT_ERR = SPRC_RESP_RESULT_ERR;
exports.SPRC_RESP_RESULT_UNAUTH = SPRC_RESP_RESULT_UNAUTH;
exports.NQUEUE_RESET_DATA_NO = NQUEUE_RESET_DATA_NO;
exports.NQUEUE_RESET_DATA_YES = NQUEUE_RESET_DATA_YES;
exports.NQUEUE_IN_PROGRESS_NO = NQUEUE_IN_PROGRESS_NO;
exports.NQUEUE_IN_PROGRESS_YES = NQUEUE_IN_PROGRESS_YES;
//Схема валидации сообщения очереди обмена
exports.Queue = new Schema({
@ -78,8 +84,7 @@ exports.Queue = new Schema({
type: Number,
required: true,
message: {
type: path =>
`Идентификатор сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
type: path => `Идентификатор сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан идентификатор сообщения очереди обмена (${path})`
}
},
@ -88,8 +93,7 @@ exports.Queue = new Schema({
type: Date,
required: true,
message: {
type: path =>
`Дата постановки сообщения в очередь обмена (${path}) имеет некорректный тип данных (ожидалось - Date)`,
type: path => `Дата постановки сообщения в очередь обмена (${path}) имеет некорректный тип данных (ожидалось - Date)`,
required: path => `Не указана дата постановки сообщения в очередь обмена (${path})`
}
},
@ -108,8 +112,7 @@ exports.Queue = new Schema({
type: String,
required: true,
message: {
type: path =>
`Пользователь, поставивший сообщение в очередь обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
type: path => `Пользователь, поставивший сообщение в очередь обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан пользователь, поставивший сообщение в очередь обмена (${path})`
}
},
@ -118,8 +121,7 @@ exports.Queue = new Schema({
type: Number,
required: true,
message: {
type: path =>
`Идентификатор сервиса-обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
type: path => `Идентификатор сервиса-обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан идентификатор сервиса-обработчика сообщения очереди обмена (${path})`
}
},
@ -128,8 +130,7 @@ exports.Queue = new Schema({
type: String,
required: true,
message: {
type: path =>
`Код сервиса-обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
type: path => `Код сервиса-обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан код сервиса-обработчика сообщения очереди обмена (${path})`
}
},
@ -148,8 +149,7 @@ exports.Queue = new Schema({
type: String,
required: true,
message: {
type: path =>
`Код функции сервиса-обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
type: path => `Код функции сервиса-обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан код функции сервиса-обработчика сообщения очереди обмена (${path})`
}
},
@ -158,8 +158,7 @@ exports.Queue = new Schema({
type: Date,
required: false,
message: {
type: path =>
`Дата обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Date)`,
type: path => `Дата обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Date)`,
required: path => `Не указана дата обработки сообщения очереди обмена (${path})`
}
},
@ -178,8 +177,7 @@ exports.Queue = new Schema({
type: Number,
required: true,
message: {
type: path =>
`Количество попыток обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
type: path => `Количество попыток обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указано количество попыток обработки сообщения очереди обмена (${path})`
}
},
@ -209,8 +207,7 @@ exports.Queue = new Schema({
],
required: true,
message: {
type: path =>
`Состояние обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
type: path => `Состояние обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
enum: path => `Значение состояния обработки сообщения очереди обмена (${path}) не поддерживается`,
required: path => `Не указано состояние обработки сообщения очереди обмена (${path})`
}
@ -231,10 +228,8 @@ exports.Queue = new Schema({
],
required: true,
message: {
type: path =>
`Строковый код состояния обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
enum: path =>
`Значение строкового кода состояния обработки сообщения очереди обмена (${path}) не поддерживается`,
type: path => `Строковый код состояния обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
enum: path => `Значение строкового кода состояния обработки сообщения очереди обмена (${path}) не поддерживается`,
required: path => `Не указан строковый код состояния обработки сообщения очереди обмена (${path})`
}
},
@ -243,8 +238,7 @@ exports.Queue = new Schema({
type: String,
required: false,
message: {
type: path =>
`Информация от обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
type: path => `Информация от обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указана информация от обработчика сообщения очереди обмена (${path})`
}
},
@ -253,8 +247,7 @@ exports.Queue = new Schema({
type: Number,
required: false,
message: {
type: path =>
`Идентификатор связанного сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
type: path => `Идентификатор связанного сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан идентификатор связанного сообщения очереди обмена (${path})`
}
},
@ -263,8 +256,7 @@ exports.Queue = new Schema({
type: String,
required: false,
message: {
type: path =>
`Параметры сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`,
type: path => `Параметры сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`,
required: path => `Не указаны параметры сообщения очереди обмена (${path})`
}
},
@ -273,8 +265,7 @@ exports.Queue = new Schema({
type: String,
required: false,
message: {
type: path =>
`Параметры ответа на сообщение очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`,
type: path => `Параметры ответа на сообщение очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`,
required: path => `Не указаны параметры ответа на сообщение очереди обмена (${path})`
}
},
@ -283,8 +274,7 @@ exports.Queue = new Schema({
type: Number,
required: true,
message: {
type: path =>
`Приоритет в очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
type: path => `Приоритет в очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан приоритет в очереди обмена (${path})`
}
}
@ -297,8 +287,7 @@ exports.QueueMsg = new Schema({
type: Buffer,
required: true,
message: {
type: path =>
`Данные сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - Buffer)`,
type: path => `Данные сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - Buffer)`,
required: path => `Не указаны данные сообщения очереди обмена (${path})`
}
}
@ -311,8 +300,7 @@ exports.QueueResp = new Schema({
type: Buffer,
required: true,
message: {
type: path =>
`Данные ответа сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - Buffer)`,
type: path => `Данные ответа сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - Buffer)`,
required: path => `Не указаны данные ответа сообщения очереди обмена (${path})`
}
}
@ -326,8 +314,7 @@ exports.QueuePrcResult = new Schema({
enum: [SPRC_RESP_RESULT_OK, SPRC_RESP_RESULT_ERR, SPRC_RESP_RESULT_UNAUTH],
required: true,
message: {
type: path =>
`Состояние обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
type: path => `Состояние обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
enum: path => `Значение состояния обработки сообщения очереди обмена (${path}) не поддерживается`,
required: path => `Не указано состояние обработки сообщения очереди обмена (${path})`
}
@ -337,8 +324,7 @@ exports.QueuePrcResult = new Schema({
type: String,
required: false,
message: {
type: path =>
`Параметры ответа на сообщение очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`,
type: path => `Параметры ответа на сообщение очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`,
required: path => `Не указаны параметры ответа на сообщение очереди обмена (${path})`
}
},
@ -347,8 +333,7 @@ exports.QueuePrcResult = new Schema({
type: String,
required: true,
message: {
type: path =>
`Информация от обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
type: path => `Информация от обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указана информация от обработчика сообщения очереди обмена (${path})`
}
}

View File

@ -22,7 +22,9 @@ const {
NQUEUE_EXEC_STATE_OK,
NQUEUE_EXEC_STATE_ERR,
NQUEUE_RESET_DATA_NO,
NQUEUE_RESET_DATA_YES
NQUEUE_RESET_DATA_YES,
NQUEUE_IN_PROGRESS_NO,
NQUEUE_IN_PROGRESS_YES
} = require("./obj_queue"); //Схемы валидации сообщения очереди обмена
//----------
@ -328,6 +330,29 @@ exports.putQueue = new Schema({
}
});
//Схема валидации параметров функции установки признака "В работе" позиции очереди
exports.setInProgress = new Schema({
//Идентификатор позиции очереди
nQueueId: {
type: Number,
required: true,
message: {
type: path => `Идентификатор позиции очереди (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан идентификатор позиции очереди (${path})`
}
},
nInProgress: {
type: Number,
enum: [NQUEUE_IN_PROGRESS_NO, NQUEUE_IN_PROGRESS_YES],
required: true,
message: {
type: path => `Признак "В работе" (${path}) имеет некорректный тип данных (ожидалось - Number)`,
enum: path => `Значение признака "В работе" (${path}) не поддерживается`,
required: path => `Не указано значение признака "В работе" (${path})`
}
}
});
//Схема валидации параметров функции считывания исходящих сообщений
exports.getOutgoing = new Schema({
//Количество считываемых сообщений очереди

View File

@ -42,8 +42,7 @@ exports.OutQueue = new Schema({
type: DBConnector,
required: true,
message: {
type: path =>
`Объект для взаимодействия с БД (${path}) имеет некорректный тип данных (ожидалось - DBConnector)`,
type: path => `Объект для взаимодействия с БД (${path}) имеет некорректный тип данных (ожидалось - DBConnector)`,
required: path => `Не указан объект для взаимодействия с БД (${path})`
}
},
@ -52,8 +51,7 @@ exports.OutQueue = new Schema({
type: Logger,
required: true,
message: {
type: path =>
`Объект для протоколирования работы (${path}) имеет некорректный тип данных (ожидалось - Logger)`,
type: path => `Объект для протоколирования работы (${path}) имеет некорректный тип данных (ожидалось - Logger)`,
required: path => `Не указаны объект для протоколирования работы (${path})`
}
},
@ -62,8 +60,7 @@ exports.OutQueue = new Schema({
type: Notifier,
required: true,
message: {
type: path =>
`Объект для рассылки уведомлений (${path}) имеет некорректный тип данных (ожидалось - Notifier)`,
type: path => `Объект для рассылки уведомлений (${path}) имеет некорректный тип данных (ожидалось - Notifier)`,
required: path => `Не указан объект для рассылки уведомлений (${path})`
}
},
@ -132,8 +129,7 @@ exports.startQueueProcessor = new Schema({
use: { validateChildProcess },
required: true,
message: {
validateChildProcess: path =>
`Процесс обработчика (${path}) имеет некорректный тип данных (ожидалось - ChildProcess)`,
validateChildProcess: path => `Процесс обработчика (${path}) имеет некорректный тип данных (ожидалось - ChildProcess)`,
required: path => `Не указан процесс обработчика (${path})`
}
}
@ -149,16 +145,6 @@ exports.stopQueueProcessor = new Schema({
type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан идентификатор сообщения (${path})`
}
},
//Процесс обработчика
proc: {
use: { validateChildProcess },
required: true,
message: {
validateChildProcess: path =>
`Процесс обработчика (${path}) имеет некорректный тип данных (ожидалось - ChildProcess)`,
required: path => `Не указан процесс обработчика (${path})`
}
}
});

View File

@ -412,6 +412,18 @@ const putQueue = async prms => {
return queueData.RCQUEUE[0];
};
//Установка признака "В работе" в сообщении очереди
const setInProgress = async prms => {
await executeStored({
connection: prms.connection,
sName: "PKG_EXS.QUEUE_IN_PROGRESS_SET",
inPrms: {
NEXSQUEUE: prms.nQueueId,
NIN_PROGRESS: prms.nInProgress
}
});
};
//Считывание очередной порции исходящих сообщений из очереди
const getQueueOutgoing = async prms => {
let queueOutgoingData = await executeStored({
@ -566,6 +578,7 @@ exports.getServiceExpiredQueueInfo = getServiceExpiredQueueInfo;
exports.log = log;
exports.getQueue = getQueue;
exports.putQueue = putQueue;
exports.setInProgress = setInProgress;
exports.getQueueOutgoing = getQueueOutgoing;
exports.setQueueState = setQueueState;
exports.getQueueMsg = getQueueMsg;

View File

@ -373,6 +373,18 @@ const putQueue = async prms => {
return queueData.RCQUEUE[0];
};
//Установка признака "В работе" в сообщении очереди
const setInProgress = async prms => {
await executeStored({
connection: prms.connection,
sName: "PKG_EXS$QUEUE_IN_PROGRESS_SET",
inPrms: {
NEXSQUEUE: prms.nQueueId,
NIN_PROGRESS: prms.nInProgress
}
});
};
//Считывание очередной порции исходящих сообщений из очереди
const getQueueOutgoing = async prms => {
let queueOutgoingData = await executeStored({
@ -527,6 +539,7 @@ exports.getServiceExpiredQueueInfo = getServiceExpiredQueueInfo;
exports.log = log;
exports.getQueue = getQueue;
exports.putQueue = putQueue;
exports.setInProgress = setInProgress;
exports.getQueueOutgoing = getQueueOutgoing;
exports.setQueueState = setQueueState;
exports.getQueueMsg = getQueueMsg;