diff --git a/core/db_connector.js b/core/db_connector.js index bf88ba3..926dd6c 100644 --- a/core/db_connector.js +++ b/core/db_connector.js @@ -289,6 +289,39 @@ class DBConnector extends EventEmitter { throw new ServerError(SERR_DB_EXECUTE, e.message); } } + //Считать запись очереди обмена + async getQueue(prms) { + if (this.bConnected) { + //Проверяем структуру переданных параметров + let sCheckResult = validateObject( + prms, + prmsDBConnectorSchema.getQueue, + "Параметры функции считывания записи очереди обмена" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Подготовим параметры + let getQueueData = _.cloneDeep(prms); + getQueueData.connection = this.connection; + try { + //Исполняем действие в БД + let res = await this.connector.getQueue(getQueueData); + //Валидируем полученный ответ + sCheckResult = validateObject(res, objQueueSchema.Queue, "Сообщение очереди обмена"); + if (sCheckResult) throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + //Вернём считанную запись + return res; + } catch (e) { + if (e instanceof ServerError) throw e; + else throw new ServerError(SERR_DB_EXECUTE, e.message); + } + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } else { + throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); + } + } //Считать очередную порцию исходящих сообщений async getOutgoing(prms) { if (this.bConnected) { diff --git a/models/intf_db_connector_module.js b/models/intf_db_connector_module.js index 68c79ad..e457ddd 100644 --- a/models/intf_db_connector_module.js +++ b/models/intf_db_connector_module.js @@ -74,6 +74,16 @@ exports.dbConnectorModule = new Schema({ required: "Не реализована функция протоколирования работы сервиса (log)" } }, + //Считывание записи очереди обмена + getQueue: { + use: { validateAsyncFunctionType }, + required: true, + message: { + validateAsyncFunctionType: + "Функция считывания записи очереди обмена (getQueue) имеет неверный формат (ожидалось - AsyncFunction)", + required: "Не реализована функция считывания записи очереди обмена (getQueue)" + } + }, //Считывание записей исходящих сообщений очереди getQueueOutgoing: { use: { validateAsyncFunctionType }, diff --git a/models/prms_db_connector.js b/models/prms_db_connector.js index a92a34b..2549dab 100644 --- a/models/prms_db_connector.js +++ b/models/prms_db_connector.js @@ -134,6 +134,19 @@ exports.putLog = new Schema({ } }); +//Схема валидации параметров функции считывания позиции очереди +exports.getQueue = new Schema({ + //Идентификатор позиции очереди обмена + nQueueId: { + type: Number, + required: true, + message: { + type: "Идентификатор позиции очереди обмена (nQueueId) имеет некорректный тип данных (ожидалось - Number)", + required: "Не указан идентификатор позиции очереди обмена (nQueueId)" + } + } +}); + //Схема валидации параметров функции считывания исходящих сообщений exports.getOutgoing = new Schema({ //Количество считываемых сообщений очереди diff --git a/modules/parus_oracle_db.js b/modules/parus_oracle_db.js index 4d8b590..f33f6b1 100644 --- a/modules/parus_oracle_db.js +++ b/modules/parus_oracle_db.js @@ -107,6 +107,28 @@ const log = async prms => { } }; +//Считывание записи очереди обмена +const getQueue = async prms => { + try { + let res = await prms.connection.execute( + "BEGIN PKG_EXS.QUEUE_GET(NFLAG_SMART => 0, NEXSQUEUE => :NEXSQUEUE, RCQUEUE => :RCQUEUE); END;", + { + NEXSQUEUE: prms.nQueueId, + RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } + }, + { + outFormat: oracledb.OBJECT, + autoCommit: true, + fetchInfo: { blMsg: { type: oracledb.BUFFER }, blResp: { type: oracledb.BUFFER } } + } + ); + let rows = await readCursorData(res.outBinds.RCQUEUE); + return rows[0]; + } catch (e) { + throw new Error(e.message); + } +}; + //Считывание очередной порции исходящих сообщений из очереди const getQueueOutgoing = async prms => { try { @@ -233,6 +255,7 @@ exports.disconnect = disconnect; exports.getServices = getServices; exports.getServiceFunctions = getServiceFunctions; exports.log = log; +exports.getQueue = getQueue; exports.getQueueOutgoing = getQueueOutgoing; exports.putQueueIncoming = putQueueIncoming; exports.setQueueState = setQueueState;