From f1a615b617af8cf3716e7a0cadde6731b1470c23 Mon Sep 17 00:00:00 2001 From: Mikhail Chechnev Date: Wed, 27 Nov 2024 18:31:40 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A6=D0=98=D0=A2=D0=9A-547=20-=20=D0=BF=D0=BE?= =?UTF-8?q?=D0=B4=D0=B4=D0=B5=D1=80=D0=B6=D0=BA=D0=B0=20=D0=BF=D0=BE=D0=B4?= =?UTF-8?q?=D0=BA=D0=BB=D1=8E=D1=87=D0=B5=D0=BD=D0=B8=D1=8F=20=D0=BA=20PG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.js | 8 +- config_default.js | 8 +- core/db_connector.js | 13 + models/intf_db_connector_module.js | 51 ++ modules/parus_oracle_db.js | 842 +++++++++++------------------ modules/parus_pg_db.js | 513 ++++++++++++++++++ package-lock.json | 213 ++++++++ package.json | 1 + 8 files changed, 1122 insertions(+), 527 deletions(-) create mode 100644 modules/parus_pg_db.js diff --git a/config.js b/config.js index ce6b874..b24a8d7 100644 --- a/config.js +++ b/config.js @@ -27,8 +27,14 @@ let dbConnect = { sPassword: "exs", //Схема размещения используемых объектов БД sSchema: "PARUS", - //Строка подключения к БД + //Строка подключения к БД Oracle sConnectString: "DEMOP_CITKSERV_WAN", + //Адрес сервера PG + sPGHost: "", + //Порт сервера PG + sPGPort: "", + //База данных PG + sPGDatabase: "", //Наименование сервера приложений в сессии БД sSessionAppName: "PARUS$ExchangeServer", //Подключаемый модуль обслуживания БД (низкоуровневые функции работы с СУБД) diff --git a/config_default.js b/config_default.js index b351ac4..1b352da 100644 --- a/config_default.js +++ b/config_default.js @@ -27,8 +27,14 @@ let dbConnect = { sPassword: "exs", //Схема размещения используемых объектов БД sSchema: "PARUS", - //Строка подключения к БД + //Строка подключения к БД Oracle sConnectString: "", + //Адрес сервера PG + sPGHost: "", + //Порт сервера PG + sPGPort: "", + //База данных PG + sPGDatabase: "", //Наименование сервера приложений в сессии БД sSessionAppName: "PARUS$ExchangeServer", //Подключаемый модуль обслуживания БД (низкоуровневые функции работы с СУБД) diff --git a/core/db_connector.js b/core/db_connector.js index 9237f1c..0f1e444 100644 --- a/core/db_connector.js +++ b/core/db_connector.js @@ -80,6 +80,19 @@ class DBConnector extends EventEmitter { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } + //Исполнение хранимого объекта БД + async executeStored(prms) { + //Работаем только при наличии подключения + if (this.bConnected) { + return await this.connector.executeStored({ + connection: this.connection, + sName: prms.sName, + inPrms: prms.inPrms, + outPrms: prms.outPrms, + isFunction: prms.isFunction + }); + } + } //Подключиться к БД async connect() { //Подключаемся только если ещё не подключены diff --git a/models/intf_db_connector_module.js b/models/intf_db_connector_module.js index e2305e1..06d60d8 100644 --- a/models/intf_db_connector_module.js +++ b/models/intf_db_connector_module.js @@ -24,6 +24,57 @@ const validateAsyncFunctionType = val => { //Схема валидации подключаемого модуля взаимодействия с БД exports.dbConnectorModule = new Schema({ + //Тип данных - строка БД + DT_VARCHAR: { + required: true, + message: { + required: path => `Не реализована константа для типа данных "Строка" БД (${path})` + } + }, + //Тип данных - число БД + DT_NUMBER: { + required: true, + message: { + required: path => `Не реализована константа для типа данных "Число" БД (${path})` + } + }, + //Тип данных - дата БД + DT_DATE: { + required: true, + message: { + required: path => `Не реализована константа для типа данных "Дата" БД (${path})` + } + }, + //Тип данных - текстовые данные БД + DT_CLOB: { + required: true, + message: { + required: path => `Не реализована константа для типа данных "Текстовые данные" БД (${path})` + } + }, + //Тип данных - двоичные данные БД + DT_BLOB: { + required: true, + message: { + required: path => `Не реализована константа для типа данных "Двоичные данные" БД (${path})` + } + }, + //Тип данных - курсор БД + DT_CURSOR: { + required: true, + message: { + required: path => `Не реализована константа для типа данных "Курсор" БД (${path})` + } + }, + //Исполнение хранимого объекта БД + executeStored: { + use: { validateAsyncFunctionType }, + required: true, + message: { + validateAsyncFunctionType: path => `Функция исполнения хранимого объекта БД (${path}) имеет неверный формат (ожидалось - AsyncFunction)`, + required: path => `Не реализована функция исполнения хранимого объекта БД (${path})` + } + }, //Подключение к БД connect: { use: { validateAsyncFunctionType }, diff --git a/modules/parus_oracle_db.js b/modules/parus_oracle_db.js index 02ec93e..53b9c72 100644 --- a/modules/parus_oracle_db.js +++ b/modules/parus_oracle_db.js @@ -14,49 +14,100 @@ const oracledb = require("oracledb"); //Работа с СУБД Oracle //-------------------------- const NPOOL_DRAIN_TIME = 30; //Таймаут ожидания завершения подключений при отключении пула от БД (сек) +const DT_VARCHAR = oracledb.DB_TYPE_VARCHAR; //Тип данных "Строка" БД +const DT_NUMBER = oracledb.DB_TYPE_NUMBER; //Тип данных "Число" БД +const DT_DATE = oracledb.DB_TYPE_DATE; //Тип данных "Дата" БД +const DT_CLOB = oracledb.DB_TYPE_CLOB; //Тип данных "Текстовые данные" БД +const DT_BLOB = oracledb.DB_TYPE_BLOB; //Тип данных "Двоичные данные" БД +const DT_CURSOR = oracledb.CURSOR; //Тип данных "Курсор" БД //------------ // Тело модуля //------------ -//Чтение данных из курсора -const readCursorData = cursor => { - return new Promise((resolve, reject) => { - let queryStream = cursor.toQueryStream(); - let rows = []; - queryStream.on("data", row => { - rows.push(row); - }); - queryStream.on("error", err => { - queryStream.destroy(); - reject(new Error(err.message)); - }); - queryStream.on("close", () => { - queryStream.destroy(); - resolve(rows); - }); - }); +//Формирование имени хранимого объекта БД (с параметрами) +const makeStoredName = (sName, inPrms, outPrms, isFunction = false) => { + let prms = ""; + let resultVar = ""; + for (i in inPrms) prms += `${prms ? ", " : ""}${i} => :${i}`; + for (i in outPrms) { + if (isFunction) resultVar = `:${i} := `; + else prms += `${prms ? ", " : ""}${i} => :${i}`; + break; + } + return `${resultVar ? resultVar : ""}${sName.replace(/\$[0-9]{1,9}$/, "").replace("$", ".")}(${prms})`; }; -//Проверка допустимого количества воркеров -const checkWorkers = async prms => { +//Формирование параметров хранимого объекта БД +const makeStoredPrms = (inPrms, outPrms) => { + let prms = inPrms ? inPrms : {}; + for (i in outPrms) { + prms[i] = { + type: outPrms[i], + dir: oracledb.BIND_OUT + }; + break; + } + return prms; +}; + +//Исполнение хранимого объекта БД +const executeStored = async prms => { let pooledConnection; + let outResult = {}; try { pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute("BEGIN :LIC_CNT := PKG_EXS.UTL_LIC_CLNT_COUNT_GET(); END;", { - LIC_CNT: { dir: oracledb.BIND_OUT, type: oracledb.NUMBER } - }); - let nMaxLic = res.outBinds.LIC_CNT; - if (nMaxLic === 0) { - throw new Error(`Не определено количество лицензий для приложения "ExchangeServer".`); - } - if (prms.nMaxWorkers > nMaxLic - 1) { - throw new Error( - `Недопустимое значение параметра "Количество одновременно обрабатываемых исходящих сообщений" ("outGoing.nMaxWorkers") файла конфигурации сервиса приложений ("config.js"). Максимальное количество одновременно обрабатываемых исходящих сообщений - ${ - nMaxLic - 1 - }.` - ); + let outPrmName = ""; + let outPrmIsCursor = false; + for (i in prms.outPrms) { + outPrmName = i; + outPrmIsCursor = prms.outPrms[i] == DT_CURSOR; + break; } + let res = await pooledConnection.execute( + `begin ${makeStoredName(prms.sName, prms.inPrms, prms.outPrms, prms.isFunction)}; end;`, + makeStoredPrms(prms.inPrms, prms.outPrms), + { + ...(outPrmIsCursor ? { outFormat: oracledb.OBJECT } : {}), + ...{ autoCommit: true } + } + ); + if (res) + if (res.outBinds) { + const outBind = res.outBinds[outPrmName]; + if (outPrmIsCursor) { + const readCursorData = cursor => { + return new Promise((resolve, reject) => { + let queryStream = cursor.toQueryStream(); + let rows = []; + queryStream.on("data", row => { + rows.push(row); + }); + queryStream.on("error", err => { + queryStream.destroy(); + reject(new Error(err.message)); + }); + queryStream.on("close", () => { + queryStream.destroy(); + resolve(rows); + }); + }); + }; + let rows = await readCursorData(outBind); + let rowsRes = []; + await Promise.all( + rows.map(async row => { + let rowRes = {}; + for (i in row) { + let isLob = row[i] ? (row[i].type ? row[i].type == oracledb.BLOB || row[i].type == oracledb.CLOB : false) : false; + rowRes[i] = isLob ? await row[i].getData() : row[i]; + } + rowsRes.push(rowRes); + }) + ); + outResult[outPrmName] = rowsRes; + } else outResult[outPrmName] = outBind; + } } catch (e) { throw new Error(e.message); } finally { @@ -68,30 +119,43 @@ const checkWorkers = async prms => { } } } + if (outResult) return outResult; +}; + +//Проверка допустимого количества воркеров +const checkWorkers = async prms => { + let workersData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.UTL_LIC_CLNT_COUNT_GET", + outPrms: { + LIC_CNT: DT_NUMBER + }, + isFunction: true + }); + if (workersData.LIC_CNT === 0) { + throw new Error(`Не определено количество лицензий для приложения "ExchangeServer".`); + } + if (prms.nMaxWorkers > workersData.LIC_CNT - 1) { + throw new Error( + `Недопустимое значение параметра "Количество одновременно обрабатываемых исходящих сообщений" ("outGoing.nMaxWorkers") файла конфигурации сервиса приложений ("config.js"). Максимальное количество одновременно обрабатываемых исходящих сообщений - ${ + workersData.LIC_CNT - 1 + }.` + ); + } }; //Проверка соответствия релизов сервера приложений и системы const checkRelease = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute("BEGIN :DB_RELEASE := PKG_EXS.UTL_PRODUCT_RELEASE_GET(); END;", { - DB_RELEASE: { dir: oracledb.BIND_OUT, type: oracledb.DB_TYPE_VARCHAR } - }); - let sDB_RELEASE = res.outBinds.DB_RELEASE; - if (sDB_RELEASE !== prms.sRelease) { - throw new Error(`Версия сервера приложений (${prms.sRelease}) не соответствует версии системы (${sDB_RELEASE}).`); - } - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } + let releaseData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.UTL_PRODUCT_RELEASE_GET", + outPrms: { + DB_RELEASE: DT_VARCHAR + }, + isFunction: true + }); + if (releaseData.DB_RELEASE !== prms.sRelease) { + throw new Error(`Версия сервера приложений (${prms.sRelease}) не соответствует версии системы (${releaseData.DB_RELEASE}).`); } }; @@ -140,566 +204,294 @@ const disconnect = async prms => { //Получение списка сервисов const getServices = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.SERVICES_GET(RCSERVICES => :RCSERVICES); END;", - { RCSERVICES: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } }, - { outFormat: oracledb.OBJECT } - ); - let rows = await readCursorData(res.outBinds.RCSERVICES); - return rows; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } + let servicesData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.SERVICES_GET", + outPrms: { + RCSERVICES: DT_CURSOR } - } + }); + return servicesData.RCSERVICES; }; //Получение списка функций сервиса const getServiceFunctions = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.SERVICEFNS_GET(NEXSSERVICE => :NEXSSERVICE, RCSERVICEFNS => :RCSERVICEFNS); END;", - { NEXSSERVICE: prms.nServiceId, RCSERVICEFNS: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } }, - { outFormat: oracledb.OBJECT } - ); - let rows = await readCursorData(res.outBinds.RCSERVICEFNS); - return rows; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } + let serviceFunctionsData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.SERVICEFNS_GET", + inPrms: { + NEXSSERVICE: prms.nServiceId + }, + outPrms: { + RCSERVICEFNS: DT_CURSOR } - } + }); + return serviceFunctionsData.RCSERVICEFNS; }; //Получение контекста сервиса const getServiceContext = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.SERVICE_CTX_GET(NFLAG_SMART => 0, NEXSSERVICE => :NEXSSERVICE, RCSERVICE_CTX => :RCSERVICE_CTX); END;", - { NEXSSERVICE: prms.nServiceId, RCSERVICE_CTX: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } }, - { outFormat: oracledb.OBJECT } - ); - let rows = await readCursorData(res.outBinds.RCSERVICE_CTX); - return rows[0]; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } + let serviceContextData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.SERVICE_CTX_GET", + inPrms: { + NFLAG_SMART: 0, + NEXSSERVICE: prms.nServiceId + }, + outPrms: { + RCSERVICE_CTX: DT_CURSOR } - } + }); + return serviceContextData.RCSERVICE_CTX[0]; }; //Установка контекста сервиса const setServiceContext = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - await pooledConnection.execute( - "BEGIN PKG_EXS.SERVICE_CTX_SET(NEXSSERVICE => :NEXSSERVICE, SCTX => :SCTX, DCTX_EXP => :DCTX_EXP); END;", - { NEXSSERVICE: prms.nServiceId, SCTX: prms.sCtx, DCTX_EXP: prms.dCtxExp }, - { autoCommit: true } - ); - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } + await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.SERVICE_CTX_SET", + inPrms: { + NEXSSERVICE: prms.nServiceId, + SCTX: prms.sCtx, + DCTX_EXP: prms.dCtxExp } - } + }); }; //Очистка контекста сервиса const clearServiceContext = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - await pooledConnection.execute( - "BEGIN PKG_EXS.SERVICE_CTX_CLEAR(NEXSSERVICE => :NEXSSERVICE); END;", - { NEXSSERVICE: prms.nServiceId }, - { autoCommit: true } - ); - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } - } + await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.SERVICE_CTX_CLEAR", + inPrms: { NEXSSERVICE: prms.nServiceId } + }); }; //Проверка атуентифицированности сервиса const isServiceAuth = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute("BEGIN :RET := PKG_EXS.SERVICE_IS_AUTH(NEXSSERVICE => :NEXSSERVICE); END;", { - NEXSSERVICE: prms.nServiceId, - RET: { dir: oracledb.BIND_OUT, type: oracledb.NUMBER } - }); - return res.outBinds.RET; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } - } + let serviceAuth = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.SERVICE_IS_AUTH", + inPrms: { NEXSSERVICE: prms.nServiceId }, + outPrms: { + RET: DT_NUMBER + }, + isFunction: true + }); + return serviceAuth.RET; }; //Постановка в очередь задания на аутентификацию сервиса const putServiceAuthInQueue = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - await pooledConnection.execute( - "BEGIN PKG_EXS.SERVICE_AUTH_PUT_INQUEUE_AT(NEXSSERVICE => :NEXSSERVICE, NFORCE => :NFORCE); END;", - { NEXSSERVICE: prms.nServiceId, NFORCE: prms.nForce }, - { autoCommit: true } - ); - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } - } + await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.SERVICE_AUTH_PUT_INQUEUE_AT", + inPrms: { NEXSSERVICE: prms.nServiceId, NFORCE: prms.nForce } + }); }; //Получение информации о просроченных сообщениях обмена сервиса const getServiceExpiredQueueInfo = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.SERVICE_QUEUE_EXPIRED_INFO_GET(NEXSSERVICE => :NEXSSERVICE, RCSERVICE_QUEUE_EXPIRED_INFO => :RCSERVICE_QUEUE_EXPIRED_INFO); END;", - { - NEXSSERVICE: prms.nServiceId, - RCSERVICE_QUEUE_EXPIRED_INFO: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } - }, - { outFormat: oracledb.OBJECT } - ); - let rows = await readCursorData(res.outBinds.RCSERVICE_QUEUE_EXPIRED_INFO); - return rows[0]; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } + let serviceExpiredQueueInfoData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.SERVICE_QUEUE_EXPIRED_INFO_GET", + inPrms: { + NEXSSERVICE: prms.nServiceId + }, + outPrms: { + RCSERVICE_QUEUE_EXPIRED_INFO: DT_CURSOR } - } + }); + return serviceExpiredQueueInfoData.RCSERVICE_QUEUE_EXPIRED_INFO[0]; }; //Запись в протокол работы const log = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.LOG_PUT(NLOG_STATE => :NLOG_STATE, SMSG => :SMSG, NEXSSERVICE => :NEXSSERVICE, NEXSSERVICEFN => :NEXSSERVICEFN, NEXSQUEUE => :NEXSQUEUE, RCLOG => :RCLOG); END;", - { - NLOG_STATE: prms.nLogState, - SMSG: prms.sMsg, - NEXSSERVICE: prms.nServiceId, - NEXSSERVICEFN: prms.nServiceFnId, - NEXSQUEUE: prms.nQueueId, - RCLOG: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } - }, - { outFormat: oracledb.OBJECT, autoCommit: true } - ); - let rows = await readCursorData(res.outBinds.RCLOG); - return rows[0]; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } - } + let logData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.LOG_PUT", + inPrms: { + NLOG_STATE: prms.nLogState, + SMSG: prms.sMsg, + NEXSSERVICE: prms.nServiceId, + NEXSSERVICEFN: prms.nServiceFnId, + NEXSQUEUE: prms.nQueueId + }, + outPrms: { RCLOG: DT_CURSOR } + }); + return logData.RCLOG[0]; }; //Считывание записи очереди обмена const getQueue = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.QUEUE_GET(NFLAG_SMART => 0, NEXSQUEUE => :NEXSQUEUE, RCQUEUE => :RCQUEUE); END;", - { - NEXSQUEUE: prms.nQueueId, - RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } - }, - { outFormat: oracledb.OBJECT, autoCommit: true } - ); - let rows = await readCursorData(res.outBinds.RCQUEUE); - return rows[0]; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } - } + let queueData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.QUEUE_GET", + inPrms: { + NFLAG_SMART: 0, + NEXSQUEUE: prms.nQueueId + }, + outPrms: { RCQUEUE: DT_CURSOR } + }); + return queueData.RCQUEUE[0]; }; //Помещение сообщения в очередь const putQueue = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.QUEUE_PUT(NEXSSERVICEFN => :NEXSSERVICEFN, BMSG => :BMSG, NEXSQUEUE => :NEXSQUEUE, NLNK_COMPANY => :NLNK_COMPANY, NLNK_DOCUMENT => :NLNK_DOCUMENT, SLNK_UNITCODE => :SLNK_UNITCODE, SOPTIONS => :SOPTIONS, RCQUEUE => :RCQUEUE); END;", - { - NEXSSERVICEFN: prms.nServiceFnId, - BMSG: prms.blMsg, - NEXSQUEUE: prms.nQueueId, - NLNK_COMPANY: prms.nLnkCompanyId, - NLNK_DOCUMENT: prms.nLnkDocumentId, - SLNK_UNITCODE: prms.sLnkUnitcode, - SOPTIONS: prms.sOptions, - RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } - }, - { outFormat: oracledb.OBJECT, autoCommit: true } - ); - let rows = await readCursorData(res.outBinds.RCQUEUE); - return rows[0]; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } - } + let queueData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.QUEUE_PUT", + inPrms: { + NEXSSERVICEFN: prms.nServiceFnId, + BMSG: prms.blMsg, + NEXSQUEUE: prms.nQueueId, + NLNK_COMPANY: prms.nLnkCompanyId, + NLNK_DOCUMENT: prms.nLnkDocumentId, + SLNK_UNITCODE: prms.sLnkUnitcode, + SOPTIONS: prms.sOptions + }, + outPrms: { RCQUEUE: DT_CURSOR } + }); + return queueData.RCQUEUE[0]; }; //Считывание очередной порции исходящих сообщений из очереди const getQueueOutgoing = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.QUEUE_SRV_TYPE_SEND_GET(NPORTION_SIZE => :NPORTION_SIZE, RCQUEUES => :RCQUEUES); END;", - { - NPORTION_SIZE: prms.nPortionSize, - RCQUEUES: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } - }, - { outFormat: oracledb.OBJECT } - ); - let rows = await readCursorData(res.outBinds.RCQUEUES); - return rows; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } - } + let queueOutgoingData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.QUEUE_SRV_TYPE_SEND_GET", + inPrms: { + NPORTION_SIZE: prms.nPortionSize + }, + outPrms: { RCQUEUES: DT_CURSOR } + }); + return queueOutgoingData.RCQUEUES; }; //Установка значения состояния в сообщении очереди const setQueueState = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.QUEUE_EXEC_STATE_SET(NEXSQUEUE => :NEXSQUEUE, NEXEC_STATE => :NEXEC_STATE, SEXEC_MSG => :SEXEC_MSG, NINC_EXEC_CNT => :NINC_EXEC_CNT, NRESET_DATA => :NRESET_DATA, RCQUEUE => :RCQUEUE); END;", - { - NEXSQUEUE: prms.nQueueId, - NEXEC_STATE: prms.nExecState, - SEXEC_MSG: prms.sExecMsg, - NINC_EXEC_CNT: prms.nIncExecCnt, - NRESET_DATA: prms.nResetData, - RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } - }, - { outFormat: oracledb.OBJECT, autoCommit: true } - ); - let rows = await readCursorData(res.outBinds.RCQUEUE); - return rows[0]; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } - } + let queueStateData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.QUEUE_EXEC_STATE_SET", + inPrms: { + NEXSQUEUE: prms.nQueueId, + NEXEC_STATE: prms.nExecState, + SEXEC_MSG: prms.sExecMsg, + NINC_EXEC_CNT: prms.nIncExecCnt, + NRESET_DATA: prms.nResetData + }, + outPrms: { RCQUEUE: DT_CURSOR } + }); + return queueStateData.RCQUEUE[0]; }; //Считывание данных сообщения очереди const getQueueMsg = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.QUEUE_MSG_GET(NEXSQUEUE => :NEXSQUEUE, RCQUEUE_MSG => :RCQUEUE_MSG); END;", - { - NEXSQUEUE: prms.nQueueId, - RCQUEUE_MSG: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } - }, - { - outFormat: oracledb.OBJECT, - autoCommit: true, - fetchInfo: { blMsg: { type: oracledb.BUFFER } } - } - ); - let rows = await readCursorData(res.outBinds.RCQUEUE_MSG); - return rows[0]; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } - } + let queueMsgData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.QUEUE_MSG_GET", + inPrms: { + NEXSQUEUE: prms.nQueueId + }, + outPrms: { RCQUEUE_MSG: DT_CURSOR } + }); + return queueMsgData.RCQUEUE_MSG[0]; }; //Установка данных сообщения очереди const setQueueMsg = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.QUEUE_MSG_SET(NEXSQUEUE => :NEXSQUEUE, BMSG => :BMSG, RCQUEUE => :RCQUEUE); END;", - { - NEXSQUEUE: prms.nQueueId, - BMSG: prms.blMsg, - RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } - }, - { outFormat: oracledb.OBJECT, autoCommit: true } - ); - let rows = await readCursorData(res.outBinds.RCQUEUE); - return rows[0]; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } - } + let queueMsgData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.QUEUE_MSG_SET", + inPrms: { + NEXSQUEUE: prms.nQueueId, + BMSG: prms.blMsg + }, + outPrms: { RCQUEUE: DT_CURSOR } + }); + return queueMsgData.RCQUEUE[0]; }; //Установка параметров сообщения очереди const setQueueOptions = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.QUEUE_OPTIONS_SET(NEXSQUEUE => :NEXSQUEUE, SOPTIONS => :SOPTIONS, RCQUEUE => :RCQUEUE); END;", - { - NEXSQUEUE: prms.nQueueId, - SOPTIONS: prms.sOptions, - RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } - }, - { outFormat: oracledb.OBJECT, autoCommit: true } - ); - let rows = await readCursorData(res.outBinds.RCQUEUE); - return rows[0]; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } - } + let queueOptionsData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.QUEUE_OPTIONS_SET", + inPrms: { + NEXSQUEUE: prms.nQueueId, + SOPTIONS: prms.sOptions + }, + outPrms: { RCQUEUE: DT_CURSOR } + }); + return queueOptionsData.RCQUEUE[0]; }; //Считывание результата обработки сообщения очереди const getQueueResp = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.QUEUE_RESP_GET(NEXSQUEUE => :NEXSQUEUE, RCQUEUE_RESP => :RCQUEUE_RESP); END;", - { - NEXSQUEUE: prms.nQueueId, - RCQUEUE_RESP: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } - }, - { - outFormat: oracledb.OBJECT, - autoCommit: true, - fetchInfo: { blResp: { type: oracledb.BUFFER } } - } - ); - let rows = await readCursorData(res.outBinds.RCQUEUE_RESP); - return rows[0]; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } - } + let queueRespData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.QUEUE_RESP_GET", + inPrms: { + NEXSQUEUE: prms.nQueueId + }, + outPrms: { RCQUEUE_RESP: DT_CURSOR } + }); + return queueRespData.RCQUEUE_RESP[0]; }; //Установка результата обработки сообщения очереди const setQueueResp = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.QUEUE_RESP_SET(NEXSQUEUE => :NEXSQUEUE, BRESP => :BRESP, NIS_ORIGINAL => :NIS_ORIGINAL, RCQUEUE => :RCQUEUE); END;", - { - NEXSQUEUE: prms.nQueueId, - BRESP: prms.blResp, - NIS_ORIGINAL: prms.nIsOriginal, - RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } - }, - { outFormat: oracledb.OBJECT, autoCommit: true } - ); - let rows = await readCursorData(res.outBinds.RCQUEUE); - return rows[0]; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } - } + let queueRespData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.QUEUE_RESP_SET", + inPrms: { + NEXSQUEUE: prms.nQueueId, + BRESP: prms.blResp, + NIS_ORIGINAL: prms.nIsOriginal + }, + outPrms: { RCQUEUE: DT_CURSOR } + }); + return queueRespData.RCQUEUE[0]; }; //Установка параметров результата обработки сообщения очереди const setQueueOptionsResp = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.QUEUE_OPTIONS_RESP_SET(NEXSQUEUE => :NEXSQUEUE, SOPTIONS_RESP => :SOPTIONS_RESP, RCQUEUE => :RCQUEUE); END;", - { - NEXSQUEUE: prms.nQueueId, - SOPTIONS_RESP: prms.sOptionsResp, - RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } - }, - { outFormat: oracledb.OBJECT, autoCommit: true } - ); - let rows = await readCursorData(res.outBinds.RCQUEUE); - return rows[0]; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } - } + let queueOptionsRespData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.QUEUE_OPTIONS_RESP_SET", + inPrms: { + NEXSQUEUE: prms.nQueueId, + SOPTIONS_RESP: prms.sOptionsResp + }, + outPrms: { RCQUEUE: DT_CURSOR } + }); + return queueOptionsRespData.RCQUEUE[0]; }; //Исполнение обработчика со стороны БД для сообщения очереди const execQueuePrc = async prms => { - let pooledConnection; - try { - pooledConnection = await prms.connection.getConnection(); - let res = await pooledConnection.execute( - "BEGIN PKG_EXS.QUEUE_PRC(NEXSQUEUE => :NEXSQUEUE, RCRESULT => :RCRESULT); END;", - { - NEXSQUEUE: prms.nQueueId, - RCRESULT: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } - }, - { outFormat: oracledb.OBJECT, autoCommit: true } - ); - let rows = await readCursorData(res.outBinds.RCRESULT); - return rows[0]; - } catch (e) { - throw new Error(e.message); - } finally { - if (pooledConnection) { - try { - await pooledConnection.close(); - } catch (e) { - throw new Error(e.message); - } - } - } + let queuePrcData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS.QUEUE_PRC", + inPrms: { + NEXSQUEUE: prms.nQueueId + }, + outPrms: { RCRESULT: DT_CURSOR } + }); + return queuePrcData.RCRESULT[0]; }; //----------------- // Интерфейс модуля //----------------- +exports.DT_VARCHAR = DT_VARCHAR; +exports.DT_NUMBER = DT_NUMBER; +exports.DT_DATE = DT_DATE; +exports.DT_CLOB = DT_CLOB; +exports.DT_BLOB = DT_BLOB; +exports.DT_CURSOR = DT_CURSOR; +exports.executeStored = executeStored; exports.connect = connect; exports.disconnect = disconnect; exports.getServices = getServices; diff --git a/modules/parus_pg_db.js b/modules/parus_pg_db.js new file mode 100644 index 0000000..636946b --- /dev/null +++ b/modules/parus_pg_db.js @@ -0,0 +1,513 @@ +/* + Сервис интеграции ПП Парус 8 с WEB API + Дополнительный модуль: работа с БД ПП Парус 8 (Postgre) +*/ + +//---------------------- +// Подключение библиотек +//---------------------- + +const pg = require("pg"); //Работа с СУБД Postgre + +//-------------------------- +// Глобальные идентификаторы +//-------------------------- + +const DT_VARCHAR = pg.types.builtins.VARCHAR; //Тип данных "Строка" БД +const DT_NUMBER = pg.types.builtins.NUMERIC; //Тип данных "Число" БД +const DT_DATE = pg.types.builtins.DATE; //Тип данных "Дата" БД +const DT_CLOB = pg.types.builtins.TEXT; //Тип данных "Текстовые данные" БД +const DT_BLOB = pg.types.builtins.BYTEA; //Тип данных "Двоичные данные" БД +const DT_CURSOR = pg.types.builtins.REFCURSOR; //Тип данных "Курсор" БД + +//------------ +// Тело модуля +//------------ + +//Установка парсера значений для типа данных NUMERIC БД PG +pg.types.setTypeParser(pg.types.builtins.NUMERIC, val => { + return val ? Number(val) : val; +}); + +//Формирование имени хранимого объекта БД (с параметрами) +const makeStoredName = (sName, inPrms) => { + let prms = ""; + let prm_numb = 0; + for (i in inPrms) { + prm_numb++; + let prm_type = ""; + if (inPrms[i] != null && inPrms[i] != undefined) + switch (typeof inPrms[i]) { + case "string": + if (inPrms[i].length ? inPrms[i].length > 4000 : false) prm_type = "::text"; + else prm_type = "::character varying"; + break; + case "number": + prm_type = "::numeric"; + break; + case "bigint": + prm_type = "::numeric"; + break; + case "boolean": + prm_type = "::boolean"; + break; + case "object": + if (inPrms[i] instanceof Date) prm_type = "::date"; + else prm_type = "::bytea"; + break; + default: + break; + } + prms += `${prms ? ", " : ""}$${prm_numb}${prm_type}`; + } + return `${sName.replace(".", "$")}(${prms})`; +}; + +//Формирование параметров хранимого объекта БД +const makeStoredPrms = inPrms => { + let prms = []; + for (i in inPrms) prms.push(inPrms[i]); + return prms; +}; + +//Исполнение хранимого объекта БД +const executeStored = async prms => { + let client; + let outResult = {}; + try { + client = await prms.connection.connect(); + let outPrmName = ""; + let outPrmIsCursor = false; + for (i in prms.outPrms) { + outPrmName = i; + outPrmIsCursor = prms.outPrms[i] == DT_CURSOR; + break; + } + await client.query(`begin;`); + let res = await client.query({ + text: `select ${makeStoredName(prms.sName, prms.inPrms)}${outPrmName ? ` as "${outPrmName}"` : ""};`, + values: makeStoredPrms(prms.inPrms) + }); + if (res) + if (res.rows) + if (res.rows[0]) { + const outBind = res.rows[0][outPrmName]; + if (outPrmIsCursor) { + let rowsRes = []; + let doExit = false; + while (!doExit) { + doExit = true; + res = await client.query(`fetch next from "${outBind}";`); + if (res) + if (res.rows) + if (res.rowCount > 0 && res.rows.length > 0) { + doExit = false; + res.rows.map(r => rowsRes.push(r)); + } + } + outResult[outPrmName] = rowsRes; + } else outResult[outPrmName] = outBind; + } + await client.query(`commit;`); + } catch (e) { + if (client) { + try { + await client.query(`rollback;`); + } catch (err) { + throw new Error(err.message); + } + } + throw new Error(e.message); + } finally { + if (client) { + try { + await client.release(); + } catch (err) { + throw new Error(err.message); + } + } + } + if (outResult) return outResult; +}; + +//Проверка допустимого количества воркеров +const checkWorkers = async prms => { + let workersData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$UTL_LIC_CLNT_COUNT_GET", + outPrms: { + LIC_CNT: DT_NUMBER + } + }); + if (workersData.LIC_CNT === 0) { + throw new Error(`Не определено количество лицензий для приложения "ExchangeServer".`); + } + if (prms.nMaxWorkers > workersData.LIC_CNT - 1) { + throw new Error( + `Недопустимое значение параметра "Количество одновременно обрабатываемых исходящих сообщений" ("outGoing.nMaxWorkers") файла конфигурации сервиса приложений ("config.js"). Максимальное количество одновременно обрабатываемых исходящих сообщений - ${ + workersData.LIC_CNT - 1 + }.` + ); + } +}; + +//Проверка соответствия релизов сервера приложений и системы +const checkRelease = async prms => { + let releaseData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$UTL_PRODUCT_RELEASE_GET", + outPrms: { + DB_RELEASE: DT_VARCHAR + } + }); + if (releaseData.DB_RELEASE !== prms.sRelease) { + throw new Error(`Версия сервера приложений (${prms.sRelease}) не соответствует версии системы (${releaseData.DB_RELEASE}).`); + } +}; + +//Подключение к БД +const connect = async prms => { + try { + let pool = new pg.Pool({ + user: prms.sUser, + password: prms.sPassword, + host: prms.sPGHost, + port: prms.sPGPort, + database: prms.sPGDatabase, + connectionTimeoutMillis: 600000, + min: prms.nPoolMin ? prms.nPoolMin : 4, + max: prms.nPoolMax ? prms.nPoolMax : 4 + }); + pool.on("acquire", async client => { + await client.query(`select ALTER_SESSION_SET_SCHEMA($1);`, [prms.sSchema]); + }); + if (prms.bControlSystemVersion) { + await checkRelease({ sRelease: prms.sRelease, connection: pool }); + } + await checkWorkers({ nMaxWorkers: prms.nMaxWorkers, connection: pool }); + return pool; + } catch (e) { + throw new Error(e.message); + } +}; + +//Отключение от БД +const disconnect = async prms => { + try { + await prms.connection.end(); + } catch (e) { + throw new Error(e.message); + } +}; + +//Получение списка сервисов +const getServices = async prms => { + let servicesData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$SERVICES_GET", + outPrms: { + RCSERVICES: DT_CURSOR + } + }); + return servicesData.RCSERVICES; +}; + +//Получение списка функций сервиса +const getServiceFunctions = async prms => { + let serviceFunctionsData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$SERVICEFNS_GET", + inPrms: { + NEXSSERVICE: prms.nServiceId + }, + outPrms: { + RCSERVICEFNS: DT_CURSOR + } + }); + return serviceFunctionsData.RCSERVICEFNS; +}; + +//Получение контекста сервиса +const getServiceContext = async prms => { + let serviceContextData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$SERVICE_CTX_GET", + inPrms: { + NFLAG_SMART: 0, + NEXSSERVICE: prms.nServiceId + }, + outPrms: { + RCSERVICE_CTX: DT_CURSOR + } + }); + return serviceContextData.RCSERVICE_CTX[0]; +}; + +//Установка контекста сервиса +const setServiceContext = async prms => { + await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$SERVICE_CTX_SET", + inPrms: { + NEXSSERVICE: prms.nServiceId, + SCTX: prms.sCtx, + DCTX_EXP: prms.dCtxExp + } + }); +}; + +//Очистка контекста сервиса +const clearServiceContext = async prms => { + await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$SERVICE_CTX_CLEAR", + inPrms: { NEXSSERVICE: prms.nServiceId } + }); +}; + +//Проверка атуентифицированности сервиса +const isServiceAuth = async prms => { + let serviceAuth = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$SERVICE_IS_AUTH", + inPrms: { NEXSSERVICE: prms.nServiceId }, + outPrms: { + RET: DT_NUMBER + } + }); + return serviceAuth.RET; +}; + +//Постановка в очередь задания на аутентификацию сервиса +const putServiceAuthInQueue = async prms => { + await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$SERVICE_AUTH_PUT_INQUEUE_AT", + inPrms: { NEXSSERVICE: prms.nServiceId, NFORCE: prms.nForce } + }); +}; + +//Получение информации о просроченных сообщениях обмена сервиса +const getServiceExpiredQueueInfo = async prms => { + let serviceExpiredQueueInfoData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$SERVICE_QUEUE_EXPIRED_INFO_GET", + inPrms: { + NEXSSERVICE: prms.nServiceId + }, + outPrms: { + RCSERVICE_QUEUE_EXPIRED_INFO: DT_CURSOR + } + }); + return serviceExpiredQueueInfoData.RCSERVICE_QUEUE_EXPIRED_INFO[0]; +}; + +//Запись в протокол работы +const log = async prms => { + let logData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$LOG_PUT", + inPrms: { + NLOG_STATE: prms.nLogState, + SMSG: prms.sMsg, + NEXSSERVICE: prms.nServiceId, + NEXSSERVICEFN: prms.nServiceFnId, + NEXSQUEUE: prms.nQueueId + }, + outPrms: { RCLOG: DT_CURSOR } + }); + return logData.RCLOG[0]; +}; + +//Считывание записи очереди обмена +const getQueue = async prms => { + let queueData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$QUEUE_GET", + inPrms: { + NFLAG_SMART: 0, + NEXSQUEUE: prms.nQueueId + }, + outPrms: { RCQUEUE: DT_CURSOR } + }); + return queueData.RCQUEUE[0]; +}; + +//Помещение сообщения в очередь +const putQueue = async prms => { + let queueData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$QUEUE_PUT$2", + inPrms: { + NEXSSERVICEFN: prms.nServiceFnId, + BMSG: prms.blMsg, + NEXSQUEUE: prms.nQueueId, + NLNK_COMPANY: prms.nLnkCompanyId, + NLNK_DOCUMENT: prms.nLnkDocumentId, + SLNK_UNITCODE: prms.sLnkUnitcode, + SOPTIONS: prms.sOptions + }, + outPrms: { RCQUEUE: DT_CURSOR } + }); + return queueData.RCQUEUE[0]; +}; + +//Считывание очередной порции исходящих сообщений из очереди +const getQueueOutgoing = async prms => { + let queueOutgoingData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$QUEUE_SRV_TYPE_SEND_GET", + inPrms: { + NPORTION_SIZE: prms.nPortionSize + }, + outPrms: { RCQUEUES: DT_CURSOR } + }); + return queueOutgoingData.RCQUEUES; +}; + +//Установка значения состояния в сообщении очереди +const setQueueState = async prms => { + let queueStateData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$QUEUE_EXEC_STATE_SET", + inPrms: { + NEXSQUEUE: prms.nQueueId, + NEXEC_STATE: prms.nExecState, + SEXEC_MSG: prms.sExecMsg, + NINC_EXEC_CNT: prms.nIncExecCnt, + NRESET_DATA: prms.nResetData + }, + outPrms: { RCQUEUE: DT_CURSOR } + }); + return queueStateData.RCQUEUE[0]; +}; + +//Считывание данных сообщения очереди +const getQueueMsg = async prms => { + let queueMsgData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$QUEUE_MSG_GET", + inPrms: { + NEXSQUEUE: prms.nQueueId + }, + outPrms: { RCQUEUE_MSG: DT_CURSOR } + }); + return queueMsgData.RCQUEUE_MSG[0]; +}; + +//Установка данных сообщения очереди +const setQueueMsg = async prms => { + let queueMsgData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$QUEUE_MSG_SET$2", + inPrms: { + NEXSQUEUE: prms.nQueueId, + BMSG: prms.blMsg + }, + outPrms: { RCQUEUE: DT_CURSOR } + }); + return queueMsgData.RCQUEUE[0]; +}; + +//Установка параметров сообщения очереди +const setQueueOptions = async prms => { + let queueOptionsData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$QUEUE_OPTIONS_SET$2", + inPrms: { + NEXSQUEUE: prms.nQueueId, + SOPTIONS: prms.sOptions + }, + outPrms: { RCQUEUE: DT_CURSOR } + }); + return queueOptionsData.RCQUEUE[0]; +}; + +//Считывание результата обработки сообщения очереди +const getQueueResp = async prms => { + let queueRespData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$QUEUE_RESP_GET", + inPrms: { + NEXSQUEUE: prms.nQueueId + }, + outPrms: { RCQUEUE_RESP: DT_CURSOR } + }); + return queueRespData.RCQUEUE_RESP[0]; +}; + +//Установка результата обработки сообщения очереди +const setQueueResp = async prms => { + let queueRespData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$QUEUE_RESP_SET$2", + inPrms: { + NEXSQUEUE: prms.nQueueId, + BRESP: prms.blResp, + NIS_ORIGINAL: prms.nIsOriginal + }, + outPrms: { RCQUEUE: DT_CURSOR } + }); + return queueRespData.RCQUEUE[0]; +}; + +//Установка параметров результата обработки сообщения очереди +const setQueueOptionsResp = async prms => { + let queueOptionsRespData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$QUEUE_OPTIONS_RESP_SET$2", + inPrms: { + NEXSQUEUE: prms.nQueueId, + SOPTIONS_RESP: prms.sOptionsResp + }, + outPrms: { RCQUEUE: DT_CURSOR } + }); + return queueOptionsRespData.RCQUEUE[0]; +}; + +//Исполнение обработчика со стороны БД для сообщения очереди +const execQueuePrc = async prms => { + let queuePrcData = await executeStored({ + connection: prms.connection, + sName: "PKG_EXS$QUEUE_PRC", + inPrms: { + NEXSQUEUE: prms.nQueueId + }, + outPrms: { RCRESULT: DT_CURSOR } + }); + return queuePrcData.RCRESULT[0]; +}; + +//----------------- +// Интерфейс модуля +//----------------- + +exports.DT_VARCHAR = DT_VARCHAR; +exports.DT_NUMBER = DT_NUMBER; +exports.DT_DATE = DT_DATE; +exports.DT_CLOB = DT_CLOB; +exports.DT_BLOB = DT_BLOB; +exports.DT_CURSOR = DT_CURSOR; +exports.executeStored = executeStored; +exports.connect = connect; +exports.disconnect = disconnect; +exports.getServices = getServices; +exports.getServiceFunctions = getServiceFunctions; +exports.getServiceContext = getServiceContext; +exports.setServiceContext = setServiceContext; +exports.clearServiceContext = clearServiceContext; +exports.isServiceAuth = isServiceAuth; +exports.putServiceAuthInQueue = putServiceAuthInQueue; +exports.getServiceExpiredQueueInfo = getServiceExpiredQueueInfo; +exports.log = log; +exports.getQueue = getQueue; +exports.putQueue = putQueue; +exports.getQueueOutgoing = getQueueOutgoing; +exports.setQueueState = setQueueState; +exports.getQueueMsg = getQueueMsg; +exports.setQueueMsg = setQueueMsg; +exports.setQueueOptions = setQueueOptions; +exports.getQueueResp = getQueueResp; +exports.setQueueResp = setQueueResp; +exports.setQueueOptionsResp = setQueueOptionsResp; +exports.execQueuePrc = execQueuePrc; diff --git a/package-lock.json b/package-lock.json index f6e373e..b88a74b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,6 +18,7 @@ "mqtt": "^5.10.1", "nodemailer": "^6.4.11", "oracledb": "^4.2.0", + "pg": "^8.13.1", "request": "^2.88.2", "request-promise": "^4.2.6", "validate": "^5.1.0", @@ -998,6 +999,122 @@ "resolved": "https://registry.npmjs.org/performance-now/-/performance-now-2.1.0.tgz", "integrity": "sha1-Ywn04OX6kT7BxpMHrjZLSzd8nns=" }, + "node_modules/pg": { + "version": "8.13.1", + "resolved": "https://registry.npmjs.org/pg/-/pg-8.13.1.tgz", + "integrity": "sha512-OUir1A0rPNZlX//c7ksiu7crsGZTKSOXJPgtNiHGIlC9H0lO+NC6ZDYksSgBYY/thSWhnSRBv8w1lieNNGATNQ==", + "dependencies": { + "pg-connection-string": "^2.7.0", + "pg-pool": "^3.7.0", + "pg-protocol": "^1.7.0", + "pg-types": "^2.1.0", + "pgpass": "1.x" + }, + "engines": { + "node": ">= 8.0.0" + }, + "optionalDependencies": { + "pg-cloudflare": "^1.1.1" + }, + "peerDependencies": { + "pg-native": ">=3.0.1" + }, + "peerDependenciesMeta": { + "pg-native": { + "optional": true + } + } + }, + "node_modules/pg-cloudflare": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/pg-cloudflare/-/pg-cloudflare-1.1.1.tgz", + "integrity": "sha512-xWPagP/4B6BgFO+EKz3JONXv3YDgvkbVrGw2mTo3D6tVDQRh1e7cqVGvyR3BE+eQgAvx1XhW/iEASj4/jCWl3Q==", + "optional": true + }, + "node_modules/pg-connection-string": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.7.0.tgz", + "integrity": "sha512-PI2W9mv53rXJQEOb8xNR8lH7Hr+EKa6oJa38zsK0S/ky2er16ios1wLKhZyxzD7jUReiWokc9WK5nxSnC7W1TA==" + }, + "node_modules/pg-int8": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", + "integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==", + "engines": { + "node": ">=4.0.0" + } + }, + "node_modules/pg-pool": { + "version": "3.7.0", + "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.7.0.tgz", + "integrity": "sha512-ZOBQForurqh4zZWjrgSwwAtzJ7QiRX0ovFkZr2klsen3Nm0aoh33Ls0fzfv3imeH/nw/O27cjdz5kzYJfeGp/g==", + "peerDependencies": { + "pg": ">=8.0" + } + }, + "node_modules/pg-protocol": { + "version": "1.7.0", + "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.7.0.tgz", + "integrity": "sha512-hTK/mE36i8fDDhgDFjy6xNOG+LCorxLG3WO17tku+ij6sVHXh1jQUJ8hYAnRhNla4QVD2H8er/FOjc/+EgC6yQ==" + }, + "node_modules/pg-types": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz", + "integrity": "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==", + "dependencies": { + "pg-int8": "1.0.1", + "postgres-array": "~2.0.0", + "postgres-bytea": "~1.0.0", + "postgres-date": "~1.0.4", + "postgres-interval": "^1.1.0" + }, + "engines": { + "node": ">=4" + } + }, + "node_modules/pgpass": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/pgpass/-/pgpass-1.0.5.tgz", + "integrity": "sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==", + "dependencies": { + "split2": "^4.1.0" + } + }, + "node_modules/postgres-array": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-2.0.0.tgz", + "integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==", + "engines": { + "node": ">=4" + } + }, + "node_modules/postgres-bytea": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-1.0.0.tgz", + "integrity": "sha512-xy3pmLuQqRBZBXDULy7KbaitYqLcmxigw14Q5sj8QBVLqEwXfeybIKVWiqAXTlcvdvb0+xkOtDbfQMOf4lST1w==", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/postgres-date": { + "version": "1.0.7", + "resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-1.0.7.tgz", + "integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==", + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/postgres-interval": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-1.2.0.tgz", + "integrity": "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==", + "dependencies": { + "xtend": "^4.0.0" + }, + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/process": { "version": "0.11.10", "resolved": "https://registry.npmjs.org/process/-/process-0.11.10.tgz", @@ -1513,6 +1630,14 @@ "engines": { "node": ">=4.0" } + }, + "node_modules/xtend": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", + "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==", + "engines": { + "node": ">=0.4" + } } }, "dependencies": { @@ -2261,6 +2386,89 @@ "resolved": "https://registry.npmjs.org/performance-now/-/performance-now-2.1.0.tgz", "integrity": "sha1-Ywn04OX6kT7BxpMHrjZLSzd8nns=" }, + "pg": { + "version": "8.13.1", + "resolved": "https://registry.npmjs.org/pg/-/pg-8.13.1.tgz", + "integrity": "sha512-OUir1A0rPNZlX//c7ksiu7crsGZTKSOXJPgtNiHGIlC9H0lO+NC6ZDYksSgBYY/thSWhnSRBv8w1lieNNGATNQ==", + "requires": { + "pg-cloudflare": "^1.1.1", + "pg-connection-string": "^2.7.0", + "pg-pool": "^3.7.0", + "pg-protocol": "^1.7.0", + "pg-types": "^2.1.0", + "pgpass": "1.x" + } + }, + "pg-cloudflare": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/pg-cloudflare/-/pg-cloudflare-1.1.1.tgz", + "integrity": "sha512-xWPagP/4B6BgFO+EKz3JONXv3YDgvkbVrGw2mTo3D6tVDQRh1e7cqVGvyR3BE+eQgAvx1XhW/iEASj4/jCWl3Q==", + "optional": true + }, + "pg-connection-string": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.7.0.tgz", + "integrity": "sha512-PI2W9mv53rXJQEOb8xNR8lH7Hr+EKa6oJa38zsK0S/ky2er16ios1wLKhZyxzD7jUReiWokc9WK5nxSnC7W1TA==" + }, + "pg-int8": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", + "integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==" + }, + "pg-pool": { + "version": "3.7.0", + "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.7.0.tgz", + "integrity": "sha512-ZOBQForurqh4zZWjrgSwwAtzJ7QiRX0ovFkZr2klsen3Nm0aoh33Ls0fzfv3imeH/nw/O27cjdz5kzYJfeGp/g==", + "requires": {} + }, + "pg-protocol": { + "version": "1.7.0", + "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.7.0.tgz", + "integrity": "sha512-hTK/mE36i8fDDhgDFjy6xNOG+LCorxLG3WO17tku+ij6sVHXh1jQUJ8hYAnRhNla4QVD2H8er/FOjc/+EgC6yQ==" + }, + "pg-types": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz", + "integrity": "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==", + "requires": { + "pg-int8": "1.0.1", + "postgres-array": "~2.0.0", + "postgres-bytea": "~1.0.0", + "postgres-date": "~1.0.4", + "postgres-interval": "^1.1.0" + } + }, + "pgpass": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/pgpass/-/pgpass-1.0.5.tgz", + "integrity": "sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==", + "requires": { + "split2": "^4.1.0" + } + }, + "postgres-array": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-2.0.0.tgz", + "integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==" + }, + "postgres-bytea": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-1.0.0.tgz", + "integrity": "sha512-xy3pmLuQqRBZBXDULy7KbaitYqLcmxigw14Q5sj8QBVLqEwXfeybIKVWiqAXTlcvdvb0+xkOtDbfQMOf4lST1w==" + }, + "postgres-date": { + "version": "1.0.7", + "resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-1.0.7.tgz", + "integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==" + }, + "postgres-interval": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-1.2.0.tgz", + "integrity": "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==", + "requires": { + "xtend": "^4.0.0" + } + }, "process": { "version": "0.11.10", "resolved": "https://registry.npmjs.org/process/-/process-0.11.10.tgz", @@ -2654,6 +2862,11 @@ "version": "11.0.1", "resolved": "https://registry.npmjs.org/xmlbuilder/-/xmlbuilder-11.0.1.tgz", "integrity": "sha512-fDlsI/kFEx7gLvbecc0/ohLG50fugQp8ryHzMTuW9vSa1GJ0XYWKnhsUx7oie3G98+r56aTQIUB4kht42R3JvA==" + }, + "xtend": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", + "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==" } } } diff --git a/package.json b/package.json index deec4a9..e81fb89 100644 --- a/package.json +++ b/package.json @@ -27,6 +27,7 @@ "mqtt": "^5.10.1", "nodemailer": "^6.4.11", "oracledb": "^4.2.0", + "pg": "^8.13.1", "request": "^2.88.2", "request-promise": "^4.2.6", "validate": "^5.1.0",