diff --git a/core/db_connector.js b/core/db_connector.js index 7a7e261..0018459 100644 --- a/core/db_connector.js +++ b/core/db_connector.js @@ -540,6 +540,35 @@ class DBConnector extends EventEmitter { throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); } } + //Установить признак "В работе" позиции очереди + async setInProgress(prms) { + if (this.bConnected) { + //Проверяем структуру переданных параметров + let sCheckResult = validateObject( + prms, + prmsDBConnectorSchema.setInProgress, + `Параметры функции установки признака "В работе" позиции очереди` + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Подготовим параметры + let setInProgressData = deepClone(prms); + setInProgressData.connection = this.connection; + try { + //Исполняем действие в БД + await this.connector.setInProgress(setInProgressData); + //Успешно - возвращаем ничего + return; + } catch (e) { + throw new ServerError(SERR_DB_EXECUTE, e.message); + } + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } else { + throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); + } + } //Считать очередную порцию исходящих сообщений async getOutgoing(prms) { if (this.bConnected) { diff --git a/core/out_queue.js b/core/out_queue.js index 53c41a4..3d7e8fd 100644 --- a/core/out_queue.js +++ b/core/out_queue.js @@ -173,15 +173,18 @@ class OutQueue extends EventEmitter { } } //Останов обработчика - stopQueueProcessor(prms) { + async stopQueueProcessor(prms) { //Проверяем структуру переданного объекта для останова обработчика let sCheckResult = validateObject(prms, prmsOutQueueSchema.stopQueueProcessor, "Параметры функции останова обработчика сообщения очереди"); //Если структура объекта в норме if (!sCheckResult) { //Удаляем идентификатор позиции очереди из списка обрабатываемых this.rmInProgress({ nQueueId: prms.nQueueId }); - //Завершаем дочерний процесс обработчика - prms.proc.kill(); + //Сбрасываем признак "В работе" позиции очереди + await this.dbConn.setInProgress({ + nQueueId: prms.nQueueId, + nInProgress: objQueueSchema.NQUEUE_IN_PROGRESS_NO + }); //Увеличиваем количество доступных обработчиков this.nWorkersLeft++; } else { @@ -295,7 +298,8 @@ class OutQueue extends EventEmitter { if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR) await this.notifyMessageProcessError(prms); //Останавливаем обработчик и инкрементируем флаг их доступного количества try { - this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc }); + //Завершаем дочерний процесс обработчика + proc.kill(); } catch (e) { //Отразим в протоколе ошибку останова await self.logger.error(`Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`, { @@ -325,7 +329,8 @@ class OutQueue extends EventEmitter { if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_ERR) await this.notifyMessageProcessError(prms); //Останавливаем обработчик и инкрементируем флаг их доступного количества try { - this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc }); + //Завершаем дочерний процесс обработчика + proc.kill(); } catch (e) { //Отразим в протоколе ошибку останова await self.logger.error(`Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`, { @@ -334,7 +339,17 @@ class OutQueue extends EventEmitter { } }); //Перехват останова обработчика - proc.on("exit", code => {}); + proc.on("exit", async code => { + try { + //Завершаем процесс обработки сообщения + this.stopQueueProcessor({ nQueueId: prms.queue.nId }); + } catch (e) { + //Отразим в протоколе ошибку останова + await self.logger.error(`Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`, { + nQueueId: prms.queue.nId + }); + } + }); //Запускаем обработчик this.startQueueProcessor({ queue: prms.queue, proc }); } diff --git a/models/obj_queue.js b/models/obj_queue.js index 6b0ddd2..b16931a 100644 --- a/models/obj_queue.js +++ b/models/obj_queue.js @@ -33,6 +33,10 @@ const SQUEUE_EXEC_STATE_DB_ERR = "DB_ERR"; //Ошибка обработки С const SQUEUE_EXEC_STATE_OK = "OK"; //Обработано успешно (строковый код) const SQUEUE_EXEC_STATE_ERR = "ERR"; //Обработано с ошибками (строковый код) +//Значения признака "В работе" сообщения очереди обмена +const NQUEUE_IN_PROGRESS_NO = 0; //Не в работе +const NQUEUE_IN_PROGRESS_YES = 1; //В работе + //Коды результатов исполнения обработчика сообщения const SPRC_RESP_RESULT_OK = "OK"; //Обработано успешно const SPRC_RESP_RESULT_ERR = "ERR"; //Ошибка обработки @@ -70,6 +74,8 @@ exports.SPRC_RESP_RESULT_ERR = SPRC_RESP_RESULT_ERR; exports.SPRC_RESP_RESULT_UNAUTH = SPRC_RESP_RESULT_UNAUTH; exports.NQUEUE_RESET_DATA_NO = NQUEUE_RESET_DATA_NO; exports.NQUEUE_RESET_DATA_YES = NQUEUE_RESET_DATA_YES; +exports.NQUEUE_IN_PROGRESS_NO = NQUEUE_IN_PROGRESS_NO; +exports.NQUEUE_IN_PROGRESS_YES = NQUEUE_IN_PROGRESS_YES; //Схема валидации сообщения очереди обмена exports.Queue = new Schema({ @@ -78,8 +84,7 @@ exports.Queue = new Schema({ type: Number, required: true, message: { - type: path => - `Идентификатор сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, + type: path => `Идентификатор сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, required: path => `Не указан идентификатор сообщения очереди обмена (${path})` } }, @@ -88,8 +93,7 @@ exports.Queue = new Schema({ type: Date, required: true, message: { - type: path => - `Дата постановки сообщения в очередь обмена (${path}) имеет некорректный тип данных (ожидалось - Date)`, + type: path => `Дата постановки сообщения в очередь обмена (${path}) имеет некорректный тип данных (ожидалось - Date)`, required: path => `Не указана дата постановки сообщения в очередь обмена (${path})` } }, @@ -108,8 +112,7 @@ exports.Queue = new Schema({ type: String, required: true, message: { - type: path => - `Пользователь, поставивший сообщение в очередь обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, + type: path => `Пользователь, поставивший сообщение в очередь обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, required: path => `Не указан пользователь, поставивший сообщение в очередь обмена (${path})` } }, @@ -118,8 +121,7 @@ exports.Queue = new Schema({ type: Number, required: true, message: { - type: path => - `Идентификатор сервиса-обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, + type: path => `Идентификатор сервиса-обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, required: path => `Не указан идентификатор сервиса-обработчика сообщения очереди обмена (${path})` } }, @@ -128,8 +130,7 @@ exports.Queue = new Schema({ type: String, required: true, message: { - type: path => - `Код сервиса-обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, + type: path => `Код сервиса-обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, required: path => `Не указан код сервиса-обработчика сообщения очереди обмена (${path})` } }, @@ -148,8 +149,7 @@ exports.Queue = new Schema({ type: String, required: true, message: { - type: path => - `Код функции сервиса-обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, + type: path => `Код функции сервиса-обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, required: path => `Не указан код функции сервиса-обработчика сообщения очереди обмена (${path})` } }, @@ -158,8 +158,7 @@ exports.Queue = new Schema({ type: Date, required: false, message: { - type: path => - `Дата обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Date)`, + type: path => `Дата обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Date)`, required: path => `Не указана дата обработки сообщения очереди обмена (${path})` } }, @@ -178,8 +177,7 @@ exports.Queue = new Schema({ type: Number, required: true, message: { - type: path => - `Количество попыток обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, + type: path => `Количество попыток обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, required: path => `Не указано количество попыток обработки сообщения очереди обмена (${path})` } }, @@ -209,8 +207,7 @@ exports.Queue = new Schema({ ], required: true, message: { - type: path => - `Состояние обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, + type: path => `Состояние обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, enum: path => `Значение состояния обработки сообщения очереди обмена (${path}) не поддерживается`, required: path => `Не указано состояние обработки сообщения очереди обмена (${path})` } @@ -231,10 +228,8 @@ exports.Queue = new Schema({ ], required: true, message: { - type: path => - `Строковый код состояния обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, - enum: path => - `Значение строкового кода состояния обработки сообщения очереди обмена (${path}) не поддерживается`, + type: path => `Строковый код состояния обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, + enum: path => `Значение строкового кода состояния обработки сообщения очереди обмена (${path}) не поддерживается`, required: path => `Не указан строковый код состояния обработки сообщения очереди обмена (${path})` } }, @@ -243,8 +238,7 @@ exports.Queue = new Schema({ type: String, required: false, message: { - type: path => - `Информация от обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, + type: path => `Информация от обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, required: path => `Не указана информация от обработчика сообщения очереди обмена (${path})` } }, @@ -253,8 +247,7 @@ exports.Queue = new Schema({ type: Number, required: false, message: { - type: path => - `Идентификатор связанного сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, + type: path => `Идентификатор связанного сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, required: path => `Не указан идентификатор связанного сообщения очереди обмена (${path})` } }, @@ -263,8 +256,7 @@ exports.Queue = new Schema({ type: String, required: false, message: { - type: path => - `Параметры сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`, + type: path => `Параметры сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`, required: path => `Не указаны параметры сообщения очереди обмена (${path})` } }, @@ -273,8 +265,7 @@ exports.Queue = new Schema({ type: String, required: false, message: { - type: path => - `Параметры ответа на сообщение очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`, + type: path => `Параметры ответа на сообщение очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`, required: path => `Не указаны параметры ответа на сообщение очереди обмена (${path})` } }, @@ -283,8 +274,7 @@ exports.Queue = new Schema({ type: Number, required: true, message: { - type: path => - `Приоритет в очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, + type: path => `Приоритет в очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, required: path => `Не указан приоритет в очереди обмена (${path})` } } @@ -297,8 +287,7 @@ exports.QueueMsg = new Schema({ type: Buffer, required: true, message: { - type: path => - `Данные сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - Buffer)`, + type: path => `Данные сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - Buffer)`, required: path => `Не указаны данные сообщения очереди обмена (${path})` } } @@ -311,8 +300,7 @@ exports.QueueResp = new Schema({ type: Buffer, required: true, message: { - type: path => - `Данные ответа сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - Buffer)`, + type: path => `Данные ответа сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - Buffer)`, required: path => `Не указаны данные ответа сообщения очереди обмена (${path})` } } @@ -326,8 +314,7 @@ exports.QueuePrcResult = new Schema({ enum: [SPRC_RESP_RESULT_OK, SPRC_RESP_RESULT_ERR, SPRC_RESP_RESULT_UNAUTH], required: true, message: { - type: path => - `Состояние обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, + type: path => `Состояние обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, enum: path => `Значение состояния обработки сообщения очереди обмена (${path}) не поддерживается`, required: path => `Не указано состояние обработки сообщения очереди обмена (${path})` } @@ -337,8 +324,7 @@ exports.QueuePrcResult = new Schema({ type: String, required: false, message: { - type: path => - `Параметры ответа на сообщение очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`, + type: path => `Параметры ответа на сообщение очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`, required: path => `Не указаны параметры ответа на сообщение очереди обмена (${path})` } }, @@ -347,8 +333,7 @@ exports.QueuePrcResult = new Schema({ type: String, required: true, message: { - type: path => - `Информация от обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, + type: path => `Информация от обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, required: path => `Не указана информация от обработчика сообщения очереди обмена (${path})` } } diff --git a/models/prms_db_connector.js b/models/prms_db_connector.js index 8cc03db..a4d8049 100644 --- a/models/prms_db_connector.js +++ b/models/prms_db_connector.js @@ -22,7 +22,9 @@ const { NQUEUE_EXEC_STATE_OK, NQUEUE_EXEC_STATE_ERR, NQUEUE_RESET_DATA_NO, - NQUEUE_RESET_DATA_YES + NQUEUE_RESET_DATA_YES, + NQUEUE_IN_PROGRESS_NO, + NQUEUE_IN_PROGRESS_YES } = require("./obj_queue"); //Схемы валидации сообщения очереди обмена //---------- @@ -328,6 +330,29 @@ exports.putQueue = new Schema({ } }); +//Схема валидации параметров функции установки признака "В работе" позиции очереди +exports.setInProgress = new Schema({ + //Идентификатор позиции очереди + nQueueId: { + type: Number, + required: true, + message: { + type: path => `Идентификатор позиции очереди (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указан идентификатор позиции очереди (${path})` + } + }, + nInProgress: { + type: Number, + enum: [NQUEUE_IN_PROGRESS_NO, NQUEUE_IN_PROGRESS_YES], + required: true, + message: { + type: path => `Признак "В работе" (${path}) имеет некорректный тип данных (ожидалось - Number)`, + enum: path => `Значение признака "В работе" (${path}) не поддерживается`, + required: path => `Не указано значение признака "В работе" (${path})` + } + } +}); + //Схема валидации параметров функции считывания исходящих сообщений exports.getOutgoing = new Schema({ //Количество считываемых сообщений очереди diff --git a/models/prms_out_queue.js b/models/prms_out_queue.js index 4d90322..690e8f0 100644 --- a/models/prms_out_queue.js +++ b/models/prms_out_queue.js @@ -42,8 +42,7 @@ exports.OutQueue = new Schema({ type: DBConnector, required: true, message: { - type: path => - `Объект для взаимодействия с БД (${path}) имеет некорректный тип данных (ожидалось - DBConnector)`, + type: path => `Объект для взаимодействия с БД (${path}) имеет некорректный тип данных (ожидалось - DBConnector)`, required: path => `Не указан объект для взаимодействия с БД (${path})` } }, @@ -52,8 +51,7 @@ exports.OutQueue = new Schema({ type: Logger, required: true, message: { - type: path => - `Объект для протоколирования работы (${path}) имеет некорректный тип данных (ожидалось - Logger)`, + type: path => `Объект для протоколирования работы (${path}) имеет некорректный тип данных (ожидалось - Logger)`, required: path => `Не указаны объект для протоколирования работы (${path})` } }, @@ -62,8 +60,7 @@ exports.OutQueue = new Schema({ type: Notifier, required: true, message: { - type: path => - `Объект для рассылки уведомлений (${path}) имеет некорректный тип данных (ожидалось - Notifier)`, + type: path => `Объект для рассылки уведомлений (${path}) имеет некорректный тип данных (ожидалось - Notifier)`, required: path => `Не указан объект для рассылки уведомлений (${path})` } }, @@ -132,8 +129,7 @@ exports.startQueueProcessor = new Schema({ use: { validateChildProcess }, required: true, message: { - validateChildProcess: path => - `Процесс обработчика (${path}) имеет некорректный тип данных (ожидалось - ChildProcess)`, + validateChildProcess: path => `Процесс обработчика (${path}) имеет некорректный тип данных (ожидалось - ChildProcess)`, required: path => `Не указан процесс обработчика (${path})` } } @@ -149,16 +145,6 @@ exports.stopQueueProcessor = new Schema({ type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`, required: path => `Не указан идентификатор сообщения (${path})` } - }, - //Процесс обработчика - proc: { - use: { validateChildProcess }, - required: true, - message: { - validateChildProcess: path => - `Процесс обработчика (${path}) имеет некорректный тип данных (ожидалось - ChildProcess)`, - required: path => `Не указан процесс обработчика (${path})` - } } }); diff --git a/modules/parus_oracle_db.js b/modules/parus_oracle_db.js index daab4a7..0f95906 100644 --- a/modules/parus_oracle_db.js +++ b/modules/parus_oracle_db.js @@ -412,6 +412,18 @@ const putQueue = async prms => { return queueData.RCQUEUE[0]; }; +//Установка признака "В работе" в сообщении очереди +const setInProgress = async prms => { + await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.QUEUE_IN_PROGRESS_SET", + inPrms: { + NEXSQUEUE: prms.nQueueId, + NIN_PROGRESS: prms.nInProgress + } + }); +}; + //Считывание очередной порции исходящих сообщений из очереди const getQueueOutgoing = async prms => { let queueOutgoingData = await executeStored({ @@ -566,6 +578,7 @@ exports.getServiceExpiredQueueInfo = getServiceExpiredQueueInfo; exports.log = log; exports.getQueue = getQueue; exports.putQueue = putQueue; +exports.setInProgress = setInProgress; exports.getQueueOutgoing = getQueueOutgoing; exports.setQueueState = setQueueState; exports.getQueueMsg = getQueueMsg; diff --git a/modules/parus_pg_db.js b/modules/parus_pg_db.js index b780036..b3fb477 100644 --- a/modules/parus_pg_db.js +++ b/modules/parus_pg_db.js @@ -373,6 +373,18 @@ const putQueue = async prms => { return queueData.RCQUEUE[0]; }; +//Установка признака "В работе" в сообщении очереди +const setInProgress = async prms => { + await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$QUEUE_IN_PROGRESS_SET", + inPrms: { + NEXSQUEUE: prms.nQueueId, + NIN_PROGRESS: prms.nInProgress + } + }); +}; + //Считывание очередной порции исходящих сообщений из очереди const getQueueOutgoing = async prms => { let queueOutgoingData = await executeStored({ @@ -527,6 +539,7 @@ exports.getServiceExpiredQueueInfo = getServiceExpiredQueueInfo; exports.log = log; exports.getQueue = getQueue; exports.putQueue = putQueue; +exports.setInProgress = setInProgress; exports.getQueueOutgoing = getQueueOutgoing; exports.setQueueState = setQueueState; exports.getQueueMsg = getQueueMsg;