From a644f08aacf56c00ba8e9220a36dd7f1716d3df1 Mon Sep 17 00:00:00 2001 From: Mikhail Chechnev Date: Wed, 5 Dec 2018 10:39:36 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A4=D1=83=D0=BD=D0=BA=D1=86=D0=B8=D0=B8=20?= =?UTF-8?q?=D1=83=D1=81=D1=82=D0=B0=D0=BD=D0=BE=D0=B2=D0=BA=D0=B8=20=D1=80?= =?UTF-8?q?=D0=B5=D0=B7=D1=83=D0=BB=D1=8C=D1=82=D0=B0=D1=82=D0=B0=20=D0=BE?= =?UTF-8?q?=D0=B1=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D0=BA=D0=B8=20=D0=BF=D0=BE?= =?UTF-8?q?=D0=B7=D0=B8=D1=86=D0=B8=D0=B8=20=D0=BE=D1=87=D0=B5=D1=80=D0=B5?= =?UTF-8?q?=D0=B4=D0=B8=20=D1=81=D0=B5=D1=80=D0=B2=D0=B5=D1=80=D0=BE=D0=BC?= =?UTF-8?q?=20=D0=BF=D1=80=D0=B8=D0=BB=D0=BE=D0=B6=D0=B5=D0=BD=D0=B8=D0=B9?= =?UTF-8?q?,=20=D1=84=D1=83=D0=BD=D0=BA=D1=86=D0=B8=D1=8F=20=D0=B7=D0=B0?= =?UTF-8?q?=D0=BF=D1=83=D1=81=D0=BA=D0=B0=20=D0=BE=D0=B1=D1=80=D0=B0=D0=B1?= =?UTF-8?q?=D0=BE=D1=82=D1=87=D0=B8=D0=BA=D0=B0=20=D0=91=D0=94=20=D0=B4?= =?UTF-8?q?=D0=BB=D1=8F=20=D1=81=D0=BE=D0=BE=D0=B1=D1=89=D0=B5=D0=BD=D0=B8?= =?UTF-8?q?=D1=8F=20=D0=BE=D1=87=D0=B5=D1=80=D0=B5=D0=B4=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/db_connector.js | 78 +++++++++++++++++++++++++++++- models/intf_db_connector_module.js | 50 +++++++++++++++---- models/prms_db_connector.js | 61 +++++++++++++++++++++++ modules/parus_oracle_db.js | 73 +++++++++++++++++++++++++++- 4 files changed, 250 insertions(+), 12 deletions(-) diff --git a/core/db_connector.js b/core/db_connector.js index 5c2f9ef..5005860 100644 --- a/core/db_connector.js +++ b/core/db_connector.js @@ -329,7 +329,11 @@ class DBConnector extends EventEmitter { async setQueueState(prms) { if (this.bConnected) { //Проверяем структуру переданных параметров - let sCheckResult = validateObject(prms, prmsDBConnectorSchema.setQueueState); + let sCheckResult = validateObject( + prms, + prmsDBConnectorSchema.setQueueState, + "Параметры функции установки состояния позиции очереди" + ); //Если структура объекта в норме if (!sCheckResult) { //Подготовим параметры @@ -354,6 +358,78 @@ class DBConnector extends EventEmitter { throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); } } + //Установить результат обработки записи сервером приложений + async setQueueAppSrvResult(prms) { + if (this.bConnected) { + //Проверяем структуру переданных параметров + let sCheckResult = validateObject( + prms, + prmsDBConnectorSchema.setQueueAppSrvResult, + "Параметры функции установки результата обработки позиции очереди" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Исполняем действие в БД + try { + let res = await this.connector.setQueueMsg({ + nQueueId: prms.nQueueId, + blMsg: prms.blMsg, + connection: this.connection + }); + res = await this.connector.setQueueResp({ + nQueueId: prms.nQueueId, + blResp: prms.blResp, + connection: this.connection + }); + //Валидируем полученный ответ + sCheckResult = validateObject(res, objQueueSchema.Queue, "Изменённое сообщение очереди обмена"); + if (sCheckResult) throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + //Вернём измененную запись + return res; + } catch (e) { + if (e instanceof ServerError) throw e; + else throw new ServerError(SERR_DB_EXECUTE, e.message); + } + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } else { + throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); + } + } + //Исполнить обработчик со стороны БД + async execQueueDBPrc(prms) { + if (this.bConnected) { + //Проверяем структуру переданных параметров + let sCheckResult = validateObject( + prms, + prmsDBConnectorSchema.execQueueDBPrc, + "Параметры функции исполнения обработчика со стороны БД для позиции очереди" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Исполняем действие в БД + try { + let res = await this.connector.execQueuePrc({ + nQueueId: prms.nQueueId, + connection: this.connection + }); + //Валидируем полученный ответ + sCheckResult = validateObject(res, objQueueSchema.Queue, "Изменённое сообщение очереди обмена"); + if (sCheckResult) throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + //Вернём измененную запись + return res; + } catch (e) { + if (e instanceof ServerError) throw e; + else throw new ServerError(SERR_DB_EXECUTE, e.message); + } + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } else { + throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); + } + } } //----------------- diff --git a/models/intf_db_connector_module.js b/models/intf_db_connector_module.js index c294d0f..68c79ad 100644 --- a/models/intf_db_connector_module.js +++ b/models/intf_db_connector_module.js @@ -30,7 +30,7 @@ exports.dbConnectorModule = new Schema({ required: true, message: { validateAsyncFunctionType: - "Функция подключения к БД (connect) имеет неверный формат (ожидалось - Function или AsyncFunction)", + "Функция подключения к БД (connect) имеет неверный формат (ожидалось - AsyncFunction)", required: "Не реализована функция подключения к БД (connect)" } }, @@ -40,7 +40,7 @@ exports.dbConnectorModule = new Schema({ required: true, message: { validateAsyncFunctionType: - "Функция отключения от БД (disconnect) имеет неверный формат (ожидалось - Function или AsyncFunction)", + "Функция отключения от БД (disconnect) имеет неверный формат (ожидалось - AsyncFunction)", required: "Не реализована функция отключения от БД (disconnect)" } }, @@ -50,7 +50,7 @@ exports.dbConnectorModule = new Schema({ required: true, message: { validateAsyncFunctionType: - "Функция получения списка сервисов (getServices) имеет неверный формат (ожидалось - Function или AsyncFunction)", + "Функция получения списка сервисов (getServices) имеет неверный формат (ожидалось - AsyncFunction)", required: "Не реализована функция получения списка сервисов (getServices)" } }, @@ -60,7 +60,7 @@ exports.dbConnectorModule = new Schema({ required: true, message: { validateAsyncFunctionType: - "Функция получения списка функций сервиса (getServiceFunctions) имеет неверный формат (ожидалось - Function или AsyncFunction)", + "Функция получения списка функций сервиса (getServiceFunctions) имеет неверный формат (ожидалось - AsyncFunction)", required: "Не реализована функция получения списка функций сервиса (getServiceFunctions)" } }, @@ -70,7 +70,7 @@ exports.dbConnectorModule = new Schema({ required: true, message: { validateAsyncFunctionType: - "Функция протоколирования работы сервиса (log) имеет неверный формат (ожидалось - Function или AsyncFunction)", + "Функция протоколирования работы сервиса (log) имеет неверный формат (ожидалось - AsyncFunction)", required: "Не реализована функция протоколирования работы сервиса (log)" } }, @@ -80,7 +80,7 @@ exports.dbConnectorModule = new Schema({ required: true, message: { validateAsyncFunctionType: - "Функция считывания записей исходящих сообщений очереди (getQueueOutgoing) имеет неверный формат (ожидалось - Function или AsyncFunction)", + "Функция считывания записей исходящих сообщений очереди (getQueueOutgoing) имеет неверный формат (ожидалось - AsyncFunction)", required: "Не реализована функция считывания записей исходящих сообщений очереди (getQueueOutgoing)" } }, @@ -90,18 +90,48 @@ exports.dbConnectorModule = new Schema({ required: true, message: { validateAsyncFunctionType: - "Функция добавления входящего сообщения очереди (putQueueIncoming) имеет неверный формат (ожидалось - Function или AsyncFunction)", + "Функция добавления входящего сообщения очереди (putQueueIncoming) имеет неверный формат (ожидалось - AsyncFunction)", required: "Не реализована функция добавления входящего сообщения очереди (putQueueIncoming)" } }, - //Уствновка состояния записи очереди + //Уставновка состояния записи очереди setQueueState: { use: { validateAsyncFunctionType }, required: true, message: { validateAsyncFunctionType: - "Функция уствновки состояния записи очереди (setQueueState) имеет неверный формат (ожидалось - Function или AsyncFunction)", - required: "Не реализована функция уствновки состояния записи очереди (setQueueState)" + "Функция установки состояния записи очереди (setQueueState) имеет неверный формат (ожидалось - AsyncFunction)", + required: "Не реализована функция установки состояния записи очереди (setQueueState)" + } + }, + //Установка данных сообщения записи очереди + setQueueMsg: { + use: { validateAsyncFunctionType }, + required: true, + message: { + validateAsyncFunctionType: + "Функция установки данных сообщения записи очереди (setQueueMsg) имеет неверный формат (ожидалось - AsyncFunction)", + required: "Не реализована функция установки данных сообщения записи очереди (setQueueMsg)" + } + }, + //Установка результата обработки записи очереди + setQueueResp: { + use: { validateAsyncFunctionType }, + required: true, + message: { + validateAsyncFunctionType: + "Функция установки результата обработки записи очереди (setQueueResp) имеет неверный формат (ожидалось - AsyncFunction)", + required: "Не реализована функция установки результата обработки записи очереди (setQueueResp)" + } + }, + //Исполнение обработчика со стороны БД для сообщения очереди + execQueuePrc: { + use: { validateAsyncFunctionType }, + required: true, + message: { + validateAsyncFunctionType: + "Функция исполнения обработчика со стороны БД для сообщения очереди (execQueuePrc) имеет неверный формат (ожидалось - AsyncFunction)", + required: "Не реализована функция исполнения обработчика со стороны БД для сообщения очереди (execQueuePrc)" } } }); diff --git a/models/prms_db_connector.js b/models/prms_db_connector.js index 535ee48..019896f 100644 --- a/models/prms_db_connector.js +++ b/models/prms_db_connector.js @@ -30,6 +30,21 @@ const { NINC_EXEC_CNT_NO = 0; //Не инкрементировать NINC_EXEC_CNT_YES = 1; //Инкрементировать +//------------ +// Тело модуля +//------------ + +//Валидация данных сообщения очереди +const validateBuffer = val => { + //Либо null + if (val === null) { + return true; + } else { + //Либо Buffer + return val instanceof Buffer; + } +}; + //------------------ // Интерфейс модуля //------------------ @@ -187,3 +202,49 @@ exports.setQueueState = new Schema({ } } }); + +//Схема валидации параметров функции установки результата обработки позиции очереди +exports.setQueueAppSrvResult = new Schema({ + //Идентификатор позиции очереди + nQueueId: { + type: Number, + required: true, + message: { + type: "Идентификатор позиции очереди (nQueueId) имеет некорректный тип данных (ожидалось - Number)", + required: "Не указан идентификатор позиции очереди (nQueueId)" + } + }, + //Данные сообщения очереди обмена + blMsg: { + use: { validateBuffer }, + required: true, + message: { + validateBuffer: path => + `Данные сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или Buffer)`, + required: path => `Не указаны данные сообщения очереди обмена (${path})` + } + }, + //Данные ответа сообщения очереди обмена + blResp: { + use: { validateBuffer }, + required: true, + message: { + validateBuffer: path => + `Данные ответа сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или Buffer)`, + required: path => `Не указаны данные ответа сообщения очереди обмена (${path})` + } + } +}).validator({ required: val => val === null || val }); + +//Схема валидации параметров функции исполнения обработчика со стороны БД для позиции очереди +exports.execQueueDBPrc = new Schema({ + //Идентификатор позиции очереди + nQueueId: { + type: Number, + required: true, + message: { + type: "Идентификатор позиции очереди (nQueueId) имеет некорректный тип данных (ожидалось - Number)", + required: "Не указан идентификатор позиции очереди (nQueueId)" + } + } +}); diff --git a/modules/parus_oracle_db.js b/modules/parus_oracle_db.js index 1331274..4d8b590 100644 --- a/modules/parus_oracle_db.js +++ b/modules/parus_oracle_db.js @@ -131,7 +131,7 @@ const getQueueOutgoing = async prms => { //Помещение очередного входящего сообщения в очередь const putQueueIncoming = async prms => {}; -//Установка значения в сообщении очереди +//Установка значения состояния в сообщении очереди const setQueueState = async prms => { try { let res = await prms.connection.execute( @@ -156,6 +156,74 @@ const setQueueState = async prms => { } }; +//Установка данных сообщения очереди +const setQueueMsg = async prms => { + try { + let res = await prms.connection.execute( + "BEGIN PKG_EXS.QUEUE_MSG_SET(NEXSQUEUE => :NEXSQUEUE, BMSG => :BMSG, RCQUEUE => :RCQUEUE); END;", + { + NEXSQUEUE: prms.nQueueId, + BMSG: prms.blMsg, + RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } + }, + { + outFormat: oracledb.OBJECT, + autoCommit: true, + fetchInfo: { blMsg: { type: oracledb.BUFFER }, blResp: { type: oracledb.BUFFER } } + } + ); + let rows = await readCursorData(res.outBinds.RCQUEUE); + return rows[0]; + } catch (e) { + throw new Error(e.message); + } +}; + +//Установка результата обработки сообщения очереди +const setQueueResp = async prms => { + try { + let res = await prms.connection.execute( + "BEGIN PKG_EXS.QUEUE_RESP_SET(NEXSQUEUE => :NEXSQUEUE, BRESP => :BRESP, RCQUEUE => :RCQUEUE); END;", + { + NEXSQUEUE: prms.nQueueId, + BRESP: prms.blResp, + RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } + }, + { + outFormat: oracledb.OBJECT, + autoCommit: true, + fetchInfo: { blMsg: { type: oracledb.BUFFER }, blResp: { type: oracledb.BUFFER } } + } + ); + let rows = await readCursorData(res.outBinds.RCQUEUE); + return rows[0]; + } catch (e) { + throw new Error(e.message); + } +}; + +//Исполнение обработчика со стороны БД для сообщения очереди +const execQueuePrc = async prms => { + try { + let res = await prms.connection.execute( + "BEGIN PKG_EXS.QUEUE_PRC(NEXSQUEUE => :NEXSQUEUE, RCQUEUE => :RCQUEUE); END;", + { + NEXSQUEUE: prms.nQueueId, + RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } + }, + { + outFormat: oracledb.OBJECT, + autoCommit: true, + fetchInfo: { blMsg: { type: oracledb.BUFFER }, blResp: { type: oracledb.BUFFER } } + } + ); + let rows = await readCursorData(res.outBinds.RCQUEUE); + return rows[0]; + } catch (e) { + throw new Error(e.message); + } +}; + //----------------- // Интерфейс модуля //----------------- @@ -168,3 +236,6 @@ exports.log = log; exports.getQueueOutgoing = getQueueOutgoing; exports.putQueueIncoming = putQueueIncoming; exports.setQueueState = setQueueState; +exports.setQueueMsg = setQueueMsg; +exports.setQueueResp = setQueueResp; +exports.execQueuePrc = execQueuePrc;