diff --git a/core/db_connector.js b/core/db_connector.js index 27e3ffc..5c8d437 100644 --- a/core/db_connector.js +++ b/core/db_connector.js @@ -426,6 +426,39 @@ class DBConnector extends EventEmitter { throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); } } + //Считывание данных сообщения из позиции очереди + async getQueueMsg(prms) { + if (this.bConnected) { + //Проверяем структуру переданных параметров + let sCheckResult = validateObject( + prms, + prmsDBConnectorSchema.getQueueMsg, + "Параметры считывания данных ответа на сообщение позиции очереди" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Подготовим параметры + let getQueueMsgData = _.cloneDeep(prms); + getQueueMsgData.connection = this.connection; + //Исполняем действие в БД + try { + let res = await this.connector.getQueueMsg(getQueueMsgData); + //Валидируем полученный ответ + sCheckResult = validateObject(res, objQueueSchema.QueueMsg, "Данные сообщения очереди обмена"); + if (sCheckResult) throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + //Вернём данные сообщения + return res; + } catch (e) { + if (e instanceof ServerError) throw e; + else throw new ServerError(SERR_DB_EXECUTE, e.message); + } + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } else { + throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); + } + } //Запись данных сообщения в позицию очереди async setQueueMsg(prms) { if (this.bConnected) { @@ -437,13 +470,13 @@ class DBConnector extends EventEmitter { ); //Если структура объекта в норме if (!sCheckResult) { + //Подготовим параметры + let setQueueMsgData = _.cloneDeep(prms); + if (!setQueueMsgData.blMsg) setQueueMsgData.blMsg = new Buffer(""); + setQueueMsgData.connection = this.connection; //Исполняем действие в БД try { - let res = await this.connector.setQueueMsg({ - nQueueId: prms.nQueueId, - blMsg: prms.blMsg ? prms.blMsg : new Buffer(""), - connection: this.connection - }); + let res = await this.connector.setQueueMsg(setQueueMsgData); //Валидируем полученный ответ sCheckResult = validateObject(res, objQueueSchema.Queue, "Изменённое сообщение очереди обмена"); if (sCheckResult) throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); @@ -460,6 +493,43 @@ class DBConnector extends EventEmitter { throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); } } + //Считывание ответа на сообщение из позиции очереди + async getQueueResp(prms) { + if (this.bConnected) { + //Проверяем структуру переданных параметров + let sCheckResult = validateObject( + prms, + prmsDBConnectorSchema.getQueueResp, + "Параметры считывания данных ответа на сообщение позиции очереди" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Подготовим параметры + let getQueueRespData = _.cloneDeep(prms); + getQueueRespData.connection = this.connection; + //Исполняем действие в БД + try { + let res = await this.connector.getQueueResp(getQueueRespData); + //Валидируем полученный ответ + sCheckResult = validateObject( + res, + objQueueSchema.QueueResp, + "Данные ответа сообщения очереди обмена" + ); + if (sCheckResult) throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + //Вернём данные ответа на сообщение + return res; + } catch (e) { + if (e instanceof ServerError) throw e; + else throw new ServerError(SERR_DB_EXECUTE, e.message); + } + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } else { + throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); + } + } //Запись ответа на сообщение в позицию очереди async setQueueResp(prms) { if (this.bConnected) { @@ -471,13 +541,13 @@ class DBConnector extends EventEmitter { ); //Если структура объекта в норме if (!sCheckResult) { + //Подготовим параметры + let setQueueRespData = _.cloneDeep(prms); + if (!setQueueRespData.blResp) setQueueRespData.blResp = new Buffer(""); + setQueueRespData.connection = this.connection; //Исполняем действие в БД try { - let res = await this.connector.setQueueResp({ - nQueueId: prms.nQueueId, - blResp: prms.blResp ? prms.blResp : new Buffer(""), - connection: this.connection - }); + let res = await this.connector.setQueueResp(setQueueRespData); //Валидируем полученный ответ sCheckResult = validateObject(res, objQueueSchema.Queue, "Изменённое сообщение очереди обмена"); if (sCheckResult) throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); diff --git a/models/intf_db_connector_module.js b/models/intf_db_connector_module.js index 9d970f1..02da169 100644 --- a/models/intf_db_connector_module.js +++ b/models/intf_db_connector_module.js @@ -114,24 +114,44 @@ exports.dbConnectorModule = new Schema({ required: "Не реализована функция установки состояния записи очереди (setQueueState)" } }, - //Установка данных сообщения записи очереди + //Считывание данных сообщения очереди + getQueueMsg: { + use: { validateAsyncFunctionType }, + required: true, + message: { + validateAsyncFunctionType: + "Функция считывания данных сообщения очереди (getQueueMsg) имеет неверный формат (ожидалось - AsyncFunction)", + required: "Не реализована функция считывания данных сообщения очереди (getQueueMsg)" + } + }, + //Установка данных сообщения очереди setQueueMsg: { use: { validateAsyncFunctionType }, required: true, message: { validateAsyncFunctionType: - "Функция установки данных сообщения записи очереди (setQueueMsg) имеет неверный формат (ожидалось - AsyncFunction)", - required: "Не реализована функция установки данных сообщения записи очереди (setQueueMsg)" + "Функция установки данных сообщения очереди (setQueueMsg) имеет неверный формат (ожидалось - AsyncFunction)", + required: "Не реализована функция установки данных сообщения очереди (setQueueMsg)" } }, - //Установка результата обработки записи очереди + //Считывание результата обработки сообщения очереди + getQueueResp: { + use: { validateAsyncFunctionType }, + required: true, + message: { + validateAsyncFunctionType: + "Функция считывания результата обработки сообщения очереди (getQueueResp) имеет неверный формат (ожидалось - AsyncFunction)", + required: "Не реализована функция считывания результата обработки сообщения очереди (getQueueResp)" + } + }, + //Установка результата обработки сообщения очереди setQueueResp: { use: { validateAsyncFunctionType }, required: true, message: { validateAsyncFunctionType: - "Функция установки результата обработки записи очереди (setQueueResp) имеет неверный формат (ожидалось - AsyncFunction)", - required: "Не реализована функция установки результата обработки записи очереди (setQueueResp)" + "Функция установки результата обработки сообщения очереди (setQueueResp) имеет неверный формат (ожидалось - AsyncFunction)", + required: "Не реализована функция установки результата обработки сообщения очереди (setQueueResp)" } }, //Исполнение обработчика со стороны БД для сообщения очереди diff --git a/models/obj_queue.js b/models/obj_queue.js index 3a8c733..bff0b97 100644 --- a/models/obj_queue.js +++ b/models/obj_queue.js @@ -233,24 +233,6 @@ exports.Queue = new Schema({ required: "Не указана информация от обработчика сообщения очереди обмена (sExecMsg)" } }, - //Данные сообщения очереди обмена - blMsg: { - type: Buffer, - required: false, - message: { - type: "Данные сообщения очереди обмена (blMsg) имеют некорректный тип данных (ожидалось - Buffer)", - required: "Не указаны данные сообщения очереди обмена (blMsg)" - } - }, - //Данные ответа сообщения очереди обмена - blResp: { - type: Buffer, - required: false, - message: { - type: "Данные ответа сообщения очереди обмена (blResp) имеют некорректный тип данных (ожидалось - Buffer)", - required: "Не указаны данные ответа сообщения очереди обмена (blResp)" - } - }, //Идентификатор связанного сообщения очереди обмена nQueueId: { type: Number, @@ -262,3 +244,29 @@ exports.Queue = new Schema({ } } }); + +//Схема валидации данных сообщения очереди обмена +exports.QueueMsg = new Schema({ + //Данные сообщения очереди обмена + blMsg: { + type: Buffer, + required: true, + message: { + type: "Данные сообщения очереди обмена (blMsg) имеют некорректный тип данных (ожидалось - Buffer)", + required: "Не указаны данные сообщения очереди обмена (blMsg)" + } + } +}).validator({ required: val => val === null || val }); + +//Схема валидации данных ответа сообщения очереди обмена +exports.QueueResp = new Schema({ + //Данные ответа сообщения очереди обмена + blResp: { + type: Buffer, + required: true, + message: { + type: "Данные ответа сообщения очереди обмена (blResp) имеют некорректный тип данных (ожидалось - Buffer)", + required: "Не указаны данные ответа сообщения очереди обмена (blResp)" + } + } +}).validator({ required: val => val === null || val }); diff --git a/models/prms_db_connector.js b/models/prms_db_connector.js index 224863e..be697f0 100644 --- a/models/prms_db_connector.js +++ b/models/prms_db_connector.js @@ -250,6 +250,19 @@ exports.setQueueState = new Schema({ } }); +//Схема валидации параметров функции считывание данных сообщения из позиции очереди +exports.getQueueMsg = new Schema({ + //Идентификатор позиции очереди + nQueueId: { + type: Number, + required: true, + message: { + type: path => `Идентификатор позиции очереди ((${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указан идентификатор позиции очереди ((${path})` + } + } +}); + //Схема валидации параметров функции записи данных сообщения в позицию очереди exports.setQueueMsg = new Schema({ //Идентификатор позиции очереди @@ -273,6 +286,19 @@ exports.setQueueMsg = new Schema({ } }).validator({ required: val => val === null || val }); +//Схема валидации параметров функции считывание ответа на сообщение из позиции очереди +exports.getQueueResp = new Schema({ + //Идентификатор позиции очереди + nQueueId: { + type: Number, + required: true, + message: { + type: path => `Идентификатор позиции очереди ((${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указан идентификатор позиции очереди ((${path})` + } + } +}); + //Схема валидации параметров функции записи ответа на сообщение в позицию очереди exports.setQueueResp = new Schema({ //Идентификатор позиции очереди diff --git a/modules/parus_oracle_db.js b/modules/parus_oracle_db.js index ef86cb3..06e0b55 100644 --- a/modules/parus_oracle_db.js +++ b/modules/parus_oracle_db.js @@ -116,11 +116,7 @@ const getQueue = async prms => { NEXSQUEUE: prms.nQueueId, RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } }, - { - outFormat: oracledb.OBJECT, - autoCommit: true, - fetchInfo: { blMsg: { type: oracledb.BUFFER }, blResp: { type: oracledb.BUFFER } } - } + { outFormat: oracledb.OBJECT, autoCommit: true } ); let rows = await readCursorData(res.outBinds.RCQUEUE); return rows[0]; @@ -140,11 +136,7 @@ const putQueue = async prms => { NEXSQUEUE: prms.nQueueId, RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } }, - { - outFormat: oracledb.OBJECT, - autoCommit: true, - fetchInfo: { blMsg: { type: oracledb.BUFFER } } - } + { outFormat: oracledb.OBJECT, autoCommit: true } ); let rows = await readCursorData(res.outBinds.RCQUEUE); return rows[0]; @@ -162,10 +154,7 @@ const getQueueOutgoing = async prms => { NPORTION_SIZE: prms.nPortionSize, RCQUEUES: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } }, - { - outFormat: oracledb.OBJECT, - fetchInfo: { blMsg: { type: oracledb.BUFFER }, blResp: { type: oracledb.BUFFER } } - } + { outFormat: oracledb.OBJECT } ); let rows = await readCursorData(res.outBinds.RCQUEUES); return rows; @@ -186,13 +175,31 @@ const setQueueState = async prms => { NINC_EXEC_CNT: prms.nIncExecCnt, RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } }, + { outFormat: oracledb.OBJECT, autoCommit: true } + ); + let rows = await readCursorData(res.outBinds.RCQUEUE); + return rows[0]; + } catch (e) { + throw new Error(e.message); + } +}; + +//Считывание данных сообщения очереди +const getQueueMsg = async prms => { + try { + let res = await prms.connection.execute( + "BEGIN PKG_EXS.QUEUE_MSG_GET(NEXSQUEUE => :NEXSQUEUE, RCQUEUE_MSG => :RCQUEUE_MSG); END;", + { + NEXSQUEUE: prms.nQueueId, + RCQUEUE_MSG: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } + }, { outFormat: oracledb.OBJECT, autoCommit: true, - fetchInfo: { blMsg: { type: oracledb.BUFFER }, blResp: { type: oracledb.BUFFER } } + fetchInfo: { blMsg: { type: oracledb.BUFFER } } } ); - let rows = await readCursorData(res.outBinds.RCQUEUE); + let rows = await readCursorData(res.outBinds.RCQUEUE_MSG); return rows[0]; } catch (e) { throw new Error(e.message); @@ -209,13 +216,31 @@ const setQueueMsg = async prms => { BMSG: prms.blMsg, RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } }, + { outFormat: oracledb.OBJECT, autoCommit: true } + ); + let rows = await readCursorData(res.outBinds.RCQUEUE); + return rows[0]; + } catch (e) { + throw new Error(e.message); + } +}; + +//Считывание результата обработки сообщения очереди +const getQueueResp = async prms => { + try { + let res = await prms.connection.execute( + "BEGIN PKG_EXS.QUEUE_RESP_GET(NEXSQUEUE => :NEXSQUEUE, RCQUEUE_RESP => :RCQUEUE_RESP); END;", + { + NEXSQUEUE: prms.nQueueId, + RCQUEUE_RESP: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } + }, { outFormat: oracledb.OBJECT, autoCommit: true, - fetchInfo: { blMsg: { type: oracledb.BUFFER }, blResp: { type: oracledb.BUFFER } } + fetchInfo: { blResp: { type: oracledb.BUFFER } } } ); - let rows = await readCursorData(res.outBinds.RCQUEUE); + let rows = await readCursorData(res.outBinds.RCQUEUE_RESP); return rows[0]; } catch (e) { throw new Error(e.message); @@ -232,11 +257,7 @@ const setQueueResp = async prms => { BRESP: prms.blResp, RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } }, - { - outFormat: oracledb.OBJECT, - autoCommit: true, - fetchInfo: { blMsg: { type: oracledb.BUFFER }, blResp: { type: oracledb.BUFFER } } - } + { outFormat: oracledb.OBJECT, autoCommit: true } ); let rows = await readCursorData(res.outBinds.RCQUEUE); return rows[0]; @@ -254,11 +275,7 @@ const execQueuePrc = async prms => { NEXSQUEUE: prms.nQueueId, RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } }, - { - outFormat: oracledb.OBJECT, - autoCommit: true, - fetchInfo: { blMsg: { type: oracledb.BUFFER }, blResp: { type: oracledb.BUFFER } } - } + { outFormat: oracledb.OBJECT, autoCommit: true } ); let rows = await readCursorData(res.outBinds.RCQUEUE); return rows[0]; @@ -280,6 +297,8 @@ exports.getQueue = getQueue; exports.putQueue = putQueue; exports.getQueueOutgoing = getQueueOutgoing; exports.setQueueState = setQueueState; +exports.getQueueMsg = getQueueMsg; exports.setQueueMsg = setQueueMsg; +exports.getQueueResp = getQueueResp; exports.setQueueResp = setQueueResp; exports.execQueuePrc = execQueuePrc;