diff --git a/core/db_connector.js b/core/db_connector.js index 177b878..c5870c7 100644 --- a/core/db_connector.js +++ b/core/db_connector.js @@ -689,6 +689,39 @@ class DBConnector extends EventEmitter { throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); } } + //Запись параметров сообщения в позицию очереди + async setQueueOptions(prms) { + if (this.bConnected) { + //Проверяем структуру переданных параметров + let sCheckResult = validateObject( + prms, + prmsDBConnectorSchema.setQueueOptions, + "Параметры функции сохранения параметров сообщения позиции очереди" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Подготовим параметры + let setQueueOptionsData = _.cloneDeep(prms); + setQueueOptionsData.connection = this.connection; + //Исполняем действие в БД + try { + let res = await this.connector.setQueueOptions(setQueueOptionsData); + //Валидируем полученный ответ + sCheckResult = validateObject(res, objQueueSchema.Queue, "Изменённое сообщение очереди обмена"); + if (sCheckResult) throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + //Вернём измененную запись + return res; + } catch (e) { + if (e instanceof ServerError) throw e; + else throw new ServerError(SERR_DB_EXECUTE, e.message); + } + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } else { + throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); + } + } //Считывание ответа на сообщение из позиции очереди async getQueueResp(prms) { if (this.bConnected) { @@ -760,6 +793,39 @@ class DBConnector extends EventEmitter { throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); } } + //Запись параметров ответа на сообщение в позицию очереди + async setQueueOptionsResp(prms) { + if (this.bConnected) { + //Проверяем структуру переданных параметров + let sCheckResult = validateObject( + prms, + prmsDBConnectorSchema.setQueueOptionsResp, + "Параметры функции сохранения параметров ответа на сообщение позиции очереди" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Подготовим параметры + let setQueueOptionsRespData = _.cloneDeep(prms); + setQueueOptionsRespData.connection = this.connection; + //Исполняем действие в БД + try { + let res = await this.connector.setQueueOptionsResp(setQueueOptionsRespData); + //Валидируем полученный ответ + sCheckResult = validateObject(res, objQueueSchema.Queue, "Изменённое сообщение очереди обмена"); + if (sCheckResult) throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + //Вернём измененную запись + return res; + } catch (e) { + if (e instanceof ServerError) throw e; + else throw new ServerError(SERR_DB_EXECUTE, e.message); + } + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } else { + throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); + } + } //Установить результат обработки записи сервером приложений async setQueueAppSrvResult(prms) { if (this.bConnected) { diff --git a/models/intf_db_connector_module.js b/models/intf_db_connector_module.js index 6186efb..be39898 100644 --- a/models/intf_db_connector_module.js +++ b/models/intf_db_connector_module.js @@ -195,6 +195,16 @@ exports.dbConnectorModule = new Schema({ required: path => `Не реализована функция установки данных сообщения очереди (${path})` } }, + //Установка параметров сообщения очереди + setQueueOptions: { + use: { validateAsyncFunctionType }, + required: true, + message: { + validateAsyncFunctionType: path => + `Функция установки параметров сообщения очереди (${path}) имеет неверный формат (ожидалось - AsyncFunction)`, + required: path => `Не реализована функция установки параметров сообщения очереди (${path})` + } + }, //Считывание результата обработки сообщения очереди getQueueResp: { use: { validateAsyncFunctionType }, @@ -215,6 +225,17 @@ exports.dbConnectorModule = new Schema({ required: path => `Не реализована функция установки результата обработки сообщения очереди (${path})` } }, + //Установка параметров результата обработки сообщения очереди + setQueueOptionsResp: { + use: { validateAsyncFunctionType }, + required: true, + message: { + validateAsyncFunctionType: path => + `Функция установки параметров результата обработки сообщения очереди (${path}) имеет неверный формат (ожидалось - AsyncFunction)`, + required: path => + `Не реализована функция установки параметров результата обработки сообщения очереди (${path})` + } + }, //Исполнение обработчика со стороны БД для сообщения очереди execQueuePrc: { use: { validateAsyncFunctionType }, diff --git a/models/obj_queue.js b/models/obj_queue.js index d6b8af8..6a5e7a1 100644 --- a/models/obj_queue.js +++ b/models/obj_queue.js @@ -257,6 +257,26 @@ exports.Queue = new Schema({ `Идентификатор связанного сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, required: path => `Не указан идентификатор связанного сообщения очереди обмена (${path})` } + }, + //Параметры сообщения + sOptions: { + type: String, + required: false, + message: { + type: path => + `Параметры сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`, + required: path => `Не указаны параметры сообщения очереди обмена (${path})` + } + }, + //Параметры ответа + sOptionsResp: { + type: String, + required: false, + message: { + type: path => + `Параметры ответа на сообщение очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`, + required: path => `Не указаны параметры ответа на сообщение очереди обмена (${path})` + } } }); diff --git a/models/prms_db_connector.js b/models/prms_db_connector.js index 46a028f..3dea388 100644 --- a/models/prms_db_connector.js +++ b/models/prms_db_connector.js @@ -455,6 +455,29 @@ exports.setQueueMsg = new Schema({ } }).validator({ required: val => val === null || val }); +//Схема валидации параметров функции записи параметров сообщения в позицию очереди +exports.setQueueOptions = new Schema({ + //Идентификатор позиции очереди + nQueueId: { + type: Number, + required: true, + message: { + type: path => `Идентификатор позиции очереди (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указан идентификатор позиции очереди (${path})` + } + }, + //Параметры сообщения очереди обмена + sOptions: { + type: String, + required: true, + message: { + validateBuffer: path => + `Парамметры сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`, + required: path => `Не указаны параметры сообщения очереди обмена (${path})` + } + } +}).validator({ required: val => val === null || val === 0 || val }); + //Схема валидации параметров функции считывание ответа на сообщение из позиции очереди exports.getQueueResp = new Schema({ //Идентификатор позиции очереди @@ -503,6 +526,29 @@ exports.setQueueResp = new Schema({ } }).validator({ required: val => val === null || val === 0 || val }); +//Схема валидации параметров функции записи параметров ответа на сообщение в позицию очереди +exports.setQueueOptionsResp = new Schema({ + //Идентификатор позиции очереди + nQueueId: { + type: Number, + required: true, + message: { + type: path => `Идентификатор позиции очереди (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указан идентификатор позиции очереди (${path})` + } + }, + //Параметры ответа на сообщение очереди обмена + sOptionsResp: { + type: String, + required: true, + message: { + validateBuffer: path => + `Парамметры ответа на сообщение очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`, + required: path => `Не указаны параметры ответа на сообщение очереди обмена (${path})` + } + } +}).validator({ required: val => val === null || val === 0 || val }); + //Схема валидации параметров функции установки результата обработки позиции очереди exports.setQueueAppSrvResult = new Schema({ //Идентификатор позиции очереди diff --git a/modules/parus_oracle_db.js b/modules/parus_oracle_db.js index ff7fe36..1afc378 100644 --- a/modules/parus_oracle_db.js +++ b/modules/parus_oracle_db.js @@ -316,6 +316,25 @@ const setQueueMsg = async prms => { } }; +//Установка параметров сообщения очереди +const setQueueOptions = async prms => { + try { + let res = await prms.connection.execute( + "BEGIN PKG_EXS.QUEUE_OPTIONS_SET(NEXSQUEUE => :NEXSQUEUE, SOPTIONS => :SOPTIONS, RCQUEUE => :RCQUEUE); END;", + { + NEXSQUEUE: prms.nQueueId, + SOPTIONS: prms.sOptions, + RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } + }, + { outFormat: oracledb.OBJECT, autoCommit: true } + ); + let rows = await readCursorData(res.outBinds.RCQUEUE); + return rows[0]; + } catch (e) { + throw new Error(e.message); + } +}; + //Считывание результата обработки сообщения очереди const getQueueResp = async prms => { try { @@ -358,6 +377,25 @@ const setQueueResp = async prms => { } }; +//Установка параметров результата обработки сообщения очереди +const setQueueOptionsResp = async prms => { + try { + let res = await prms.connection.execute( + "BEGIN PKG_EXS.QUEUE_OPTIONS_RESP_SET(NEXSQUEUE => :NEXSQUEUE, SOPTIONS_RESP => :SOPTIONS_RESP, RCQUEUE => :RCQUEUE); END;", + { + NEXSQUEUE: prms.nQueueId, + SOPTIONS_RESP: prms.sOptionsResp, + RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } + }, + { outFormat: oracledb.OBJECT, autoCommit: true } + ); + let rows = await readCursorData(res.outBinds.RCQUEUE); + return rows[0]; + } catch (e) { + throw new Error(e.message); + } +}; + //Исполнение обработчика со стороны БД для сообщения очереди const execQueuePrc = async prms => { try { @@ -397,6 +435,8 @@ exports.getQueueOutgoing = getQueueOutgoing; exports.setQueueState = setQueueState; exports.getQueueMsg = getQueueMsg; exports.setQueueMsg = setQueueMsg; +exports.setQueueOptions = setQueueOptions; exports.getQueueResp = getQueueResp; exports.setQueueResp = setQueueResp; +exports.setQueueOptionsResp = setQueueOptionsResp; exports.execQueuePrc = execQueuePrc;