diff --git a/core/db_connector.js b/core/db_connector.js index 926dd6c..fe846ab 100644 --- a/core/db_connector.js +++ b/core/db_connector.js @@ -322,6 +322,41 @@ class DBConnector extends EventEmitter { throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); } } + //Добавить запись очереди обмена + async putQueue(prms) { + if (this.bConnected) { + //Проверяем структуру переданных параметров + let sCheckResult = validateObject( + prms, + prmsDBConnectorSchema.putQueue, + "Параметры функции добавления позиции очереди" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Исполняем действие в БД + try { + let res = await this.connector.putQueue({ + nServiceFnId: prms.nServiceFnId, + blMsg: prms.blMsg ? prms.blMsg : new Buffer(""), + nQueueId: prms.nQueueId, + connection: this.connection + }); + //Валидируем полученный ответ + sCheckResult = validateObject(res, objQueueSchema.Queue, "Добавленное сообщение очереди обмена"); + if (sCheckResult) throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + //Вернём добавленную запись + return res; + } catch (e) { + if (e instanceof ServerError) throw e; + else throw new ServerError(SERR_DB_EXECUTE, e.message); + } + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } else { + throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); + } + } //Считать очередную порцию исходящих сообщений async getOutgoing(prms) { if (this.bConnected) { diff --git a/models/intf_db_connector_module.js b/models/intf_db_connector_module.js index e457ddd..9d970f1 100644 --- a/models/intf_db_connector_module.js +++ b/models/intf_db_connector_module.js @@ -84,6 +84,16 @@ exports.dbConnectorModule = new Schema({ required: "Не реализована функция считывания записи очереди обмена (getQueue)" } }, + //Добавление сообщения очереди + putQueue: { + use: { validateAsyncFunctionType }, + required: true, + message: { + validateAsyncFunctionType: + "Функция добавления сообщения очереди (putQueue) имеет неверный формат (ожидалось - AsyncFunction)", + required: "Не реализована функция добавления сообщения очереди (putQueue)" + } + }, //Считывание записей исходящих сообщений очереди getQueueOutgoing: { use: { validateAsyncFunctionType }, @@ -94,16 +104,6 @@ exports.dbConnectorModule = new Schema({ required: "Не реализована функция считывания записей исходящих сообщений очереди (getQueueOutgoing)" } }, - //Добавление входящего сообщения очереди - putQueueIncoming: { - use: { validateAsyncFunctionType }, - required: true, - message: { - validateAsyncFunctionType: - "Функция добавления входящего сообщения очереди (putQueueIncoming) имеет неверный формат (ожидалось - AsyncFunction)", - required: "Не реализована функция добавления входящего сообщения очереди (putQueueIncoming)" - } - }, //Уставновка состояния записи очереди setQueueState: { use: { validateAsyncFunctionType }, diff --git a/models/prms_db_connector.js b/models/prms_db_connector.js index 2549dab..76b8cb5 100644 --- a/models/prms_db_connector.js +++ b/models/prms_db_connector.js @@ -147,6 +147,40 @@ exports.getQueue = new Schema({ } }); +//Схема валидации параметров функции добавления позиции очереди +exports.putQueue = new Schema({ + //Идентификатор функции сервиса обработчика позиции очереди + nServiceFnId: { + type: Number, + required: true, + message: { + type: path => + `Идентификатор функции сервиса обработчика позиции очереди (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указан идентификатор функции сервиса обработчика позиции очереди (${path})` + } + }, + //Данные сообщения очереди обмена + blMsg: { + use: { validateBuffer }, + required: false, + message: { + validateBuffer: path => + `Данные сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или Buffer)`, + required: path => `Не указаны данные сообщения очереди обмена (${path})` + } + }, + //Идентификатор связанной позиции очереди обмена + nQueueId: { + type: Number, + required: false, + message: { + type: path => + `Идентификатор связанной позиции очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указан идентификатор связанной позиции очереди обмена (${path})` + } + } +}); + //Схема валидации параметров функции считывания исходящих сообщений exports.getOutgoing = new Schema({ //Количество считываемых сообщений очереди @@ -223,8 +257,8 @@ exports.setQueueAppSrvResult = new Schema({ type: Number, required: true, message: { - type: "Идентификатор позиции очереди (nQueueId) имеет некорректный тип данных (ожидалось - Number)", - required: "Не указан идентификатор позиции очереди (nQueueId)" + type: path => `Идентификатор позиции очереди ((${path}) имеет некорректный тип данных (ожидалось - Number)`, + required: path => `Не указан идентификатор позиции очереди ((${path})` } }, //Данные сообщения очереди обмена diff --git a/modules/parus_oracle_db.js b/modules/parus_oracle_db.js index f33f6b1..ef86cb3 100644 --- a/modules/parus_oracle_db.js +++ b/modules/parus_oracle_db.js @@ -129,6 +129,30 @@ const getQueue = async prms => { } }; +//Помещение сообщения в очередь +const putQueue = async prms => { + try { + let res = await prms.connection.execute( + "BEGIN PKG_EXS.QUEUE_PUT(NEXSSERVICEFN => :NEXSSERVICEFN, BMSG => :BMSG, NEXSQUEUE => :NEXSQUEUE, RCQUEUE => :RCQUEUE); END;", + { + NEXSSERVICEFN: prms.nServiceFnId, + BMSG: prms.blMsg, + NEXSQUEUE: prms.nQueueId, + RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } + }, + { + outFormat: oracledb.OBJECT, + autoCommit: true, + fetchInfo: { blMsg: { type: oracledb.BUFFER } } + } + ); + let rows = await readCursorData(res.outBinds.RCQUEUE); + return rows[0]; + } catch (e) { + throw new Error(e.message); + } +}; + //Считывание очередной порции исходящих сообщений из очереди const getQueueOutgoing = async prms => { try { @@ -150,9 +174,6 @@ const getQueueOutgoing = async prms => { } }; -//Помещение очередного входящего сообщения в очередь -const putQueueIncoming = async prms => {}; - //Установка значения состояния в сообщении очереди const setQueueState = async prms => { try { @@ -256,8 +277,8 @@ exports.getServices = getServices; exports.getServiceFunctions = getServiceFunctions; exports.log = log; exports.getQueue = getQueue; +exports.putQueue = putQueue; exports.getQueueOutgoing = getQueueOutgoing; -exports.putQueueIncoming = putQueueIncoming; exports.setQueueState = setQueueState; exports.setQueueMsg = setQueueMsg; exports.setQueueResp = setQueueResp;