diff --git a/modules/parus_oracle_db.js b/modules/parus_oracle_db.js index 1afc378..feb2a6a 100644 --- a/modules/parus_oracle_db.js +++ b/modules/parus_oracle_db.js @@ -9,6 +9,12 @@ const oracledb = require("oracledb"); //Работа с СУБД Oracle +//-------------------------- +// Глобальные идентификаторы +//-------------------------- + +const NPOOL_DRAIN_TIME = 30; //Таймаут ожидания завершения подключений при отключении пула от БД (сек) + //------------ // Тело модуля //------------ @@ -33,14 +39,23 @@ const readCursorData = cursor => { //Подключение к БД const connect = async prms => { try { - const conn = await oracledb.getConnection({ + let pool = await oracledb.createPool({ user: prms.sUser, password: prms.sPassword, - connectString: prms.sConnectString + connectString: prms.sConnectString, + sessionCallback: (connection, requestedTag, callback) => { + if (prms.sSessionAppName) connection.module = prms.sSessionAppName; + connection.execute(`ALTER SESSION SET CURRENT_SCHEMA=${prms.sSchema}`).then( + r => { + callback(null, connection); + }, + e => { + callback(e, null); + } + ); + } }); - if (prms.sSessionAppName) conn.module = prms.sSessionAppName; - await conn.execute(`ALTER SESSION SET CURRENT_SCHEMA=${prms.sSchema}`); - return conn; + return pool; } catch (e) { throw new Error(e.message); } @@ -49,7 +64,7 @@ const connect = async prms => { //Отключение от БД const disconnect = async prms => { try { - await prms.connection.close(); + await prms.connection.close(NPOOL_DRAIN_TIME); return; } catch (e) { throw new Error(e.message); @@ -58,8 +73,10 @@ const disconnect = async prms => { //Получение списка сервисов const getServices = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.SERVICES_GET(RCSERVICES => :RCSERVICES); END;", { RCSERVICES: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } }, { outFormat: oracledb.OBJECT } @@ -68,13 +85,23 @@ const getServices = async prms => { return rows; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Получение списка функций сервиса const getServiceFunctions = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.SERVICEFNS_GET(NEXSSERVICE => :NEXSSERVICE, RCSERVICEFNS => :RCSERVICEFNS); END;", { NEXSSERVICE: prms.nServiceId, RCSERVICEFNS: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } }, { outFormat: oracledb.OBJECT } @@ -83,13 +110,23 @@ const getServiceFunctions = async prms => { return rows; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Получение контекста сервиса const getServiceContext = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.SERVICE_CTX_GET(NFLAG_SMART => 0, NEXSSERVICE => :NEXSSERVICE, RCSERVICE_CTX => :RCSERVICE_CTX); END;", { NEXSSERVICE: prms.nServiceId, RCSERVICE_CTX: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } }, { outFormat: oracledb.OBJECT } @@ -98,65 +135,115 @@ const getServiceContext = async prms => { return rows[0]; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Установка контекста сервиса const setServiceContext = async prms => { + let pooledConnection; try { - await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + await pooledConnection.execute( "BEGIN PKG_EXS.SERVICE_CTX_SET(NEXSSERVICE => :NEXSSERVICE, SCTX => :SCTX, DCTX_EXP => :DCTX_EXP); END;", { NEXSSERVICE: prms.nServiceId, SCTX: prms.sCtx, DCTX_EXP: prms.dCtxExp }, { autoCommit: true } ); } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Очистка контекста сервиса const clearServiceContext = async prms => { + let pooledConnection; try { - await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + await pooledConnection.execute( "BEGIN PKG_EXS.SERVICE_CTX_CLEAR(NEXSSERVICE => :NEXSSERVICE); END;", { NEXSSERVICE: prms.nServiceId }, { autoCommit: true } ); } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Проверка атуентифицированности сервиса const isServiceAuth = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN :RET := PKG_EXS.SERVICE_IS_AUTH(NEXSSERVICE => :NEXSSERVICE); END;", { NEXSSERVICE: prms.nServiceId, RET: { dir: oracledb.BIND_OUT, type: oracledb.NUMBER } } ); return res.outBinds.RET; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Постановка в очередь задания на аутентификацию сервиса const putServiceAuthInQueue = async prms => { + let pooledConnection; try { - await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + await pooledConnection.execute( "BEGIN PKG_EXS.SERVICE_AUTH_PUT_INQUEUE(NEXSSERVICE => :NEXSSERVICE, NFORCE => :NFORCE); END;", { NEXSSERVICE: prms.nServiceId, NFORCE: prms.nForce }, { autoCommit: true } ); } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Получение информации о просроченных сообщениях обмена сервиса const getServiceExpiredQueueInfo = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.SERVICE_QUEUE_EXPIRED_INFO_GET(NEXSSERVICE => :NEXSSERVICE, RCSERVICE_QUEUE_EXPIRED_INFO => :RCSERVICE_QUEUE_EXPIRED_INFO); END;", { NEXSSERVICE: prms.nServiceId, @@ -168,13 +255,23 @@ const getServiceExpiredQueueInfo = async prms => { return rows[0]; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Запись в протокол работы const log = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.LOG_PUT(NLOG_STATE => :NLOG_STATE, SMSG => :SMSG, NEXSSERVICE => :NEXSSERVICE, NEXSSERVICEFN => :NEXSSERVICEFN, NEXSQUEUE => :NEXSQUEUE, RCLOG => :RCLOG); END;", { NLOG_STATE: prms.nLogState, @@ -190,13 +287,23 @@ const log = async prms => { return rows[0]; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Считывание записи очереди обмена const getQueue = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.QUEUE_GET(NFLAG_SMART => 0, NEXSQUEUE => :NEXSQUEUE, RCQUEUE => :RCQUEUE); END;", { NEXSQUEUE: prms.nQueueId, @@ -208,13 +315,23 @@ const getQueue = async prms => { return rows[0]; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Помещение сообщения в очередь const putQueue = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.QUEUE_PUT(NEXSSERVICEFN => :NEXSSERVICEFN, BMSG => :BMSG, NEXSQUEUE => :NEXSQUEUE, NLNK_COMPANY => :NLNK_COMPANY, NLNK_DOCUMENT => :NLNK_DOCUMENT, SLNK_UNITCODE => :SLNK_UNITCODE, SOPTIONS => :SOPTIONS, RCQUEUE => :RCQUEUE); END;", { NEXSSERVICEFN: prms.nServiceFnId, @@ -232,13 +349,23 @@ const putQueue = async prms => { return rows[0]; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Считывание очередной порции исходящих сообщений из очереди const getQueueOutgoing = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.QUEUE_SRV_TYPE_SEND_GET(NPORTION_SIZE => :NPORTION_SIZE, RCQUEUES => :RCQUEUES); END;", { NPORTION_SIZE: prms.nPortionSize, @@ -250,13 +377,23 @@ const getQueueOutgoing = async prms => { return rows; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Установка значения состояния в сообщении очереди const setQueueState = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.QUEUE_EXEC_STATE_SET(NEXSQUEUE => :NEXSQUEUE, NEXEC_STATE => :NEXEC_STATE, SEXEC_MSG => :SEXEC_MSG, NINC_EXEC_CNT => :NINC_EXEC_CNT, NRESET_DATA => :NRESET_DATA, RCQUEUE => :RCQUEUE); END;", { NEXSQUEUE: prms.nQueueId, @@ -272,13 +409,23 @@ const setQueueState = async prms => { return rows[0]; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Считывание данных сообщения очереди const getQueueMsg = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.QUEUE_MSG_GET(NEXSQUEUE => :NEXSQUEUE, RCQUEUE_MSG => :RCQUEUE_MSG); END;", { NEXSQUEUE: prms.nQueueId, @@ -294,13 +441,23 @@ const getQueueMsg = async prms => { return rows[0]; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Установка данных сообщения очереди const setQueueMsg = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.QUEUE_MSG_SET(NEXSQUEUE => :NEXSQUEUE, BMSG => :BMSG, RCQUEUE => :RCQUEUE); END;", { NEXSQUEUE: prms.nQueueId, @@ -313,13 +470,23 @@ const setQueueMsg = async prms => { return rows[0]; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Установка параметров сообщения очереди const setQueueOptions = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.QUEUE_OPTIONS_SET(NEXSQUEUE => :NEXSQUEUE, SOPTIONS => :SOPTIONS, RCQUEUE => :RCQUEUE); END;", { NEXSQUEUE: prms.nQueueId, @@ -332,13 +499,23 @@ const setQueueOptions = async prms => { return rows[0]; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Считывание результата обработки сообщения очереди const getQueueResp = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.QUEUE_RESP_GET(NEXSQUEUE => :NEXSQUEUE, RCQUEUE_RESP => :RCQUEUE_RESP); END;", { NEXSQUEUE: prms.nQueueId, @@ -354,13 +531,23 @@ const getQueueResp = async prms => { return rows[0]; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Установка результата обработки сообщения очереди const setQueueResp = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.QUEUE_RESP_SET(NEXSQUEUE => :NEXSQUEUE, BRESP => :BRESP, NIS_ORIGINAL => :NIS_ORIGINAL, RCQUEUE => :RCQUEUE); END;", { NEXSQUEUE: prms.nQueueId, @@ -374,13 +561,23 @@ const setQueueResp = async prms => { return rows[0]; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Установка параметров результата обработки сообщения очереди const setQueueOptionsResp = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.QUEUE_OPTIONS_RESP_SET(NEXSQUEUE => :NEXSQUEUE, SOPTIONS_RESP => :SOPTIONS_RESP, RCQUEUE => :RCQUEUE); END;", { NEXSQUEUE: prms.nQueueId, @@ -393,13 +590,23 @@ const setQueueOptionsResp = async prms => { return rows[0]; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } }; //Исполнение обработчика со стороны БД для сообщения очереди const execQueuePrc = async prms => { + let pooledConnection; try { - let res = await prms.connection.execute( + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( "BEGIN PKG_EXS.QUEUE_PRC(NEXSQUEUE => :NEXSQUEUE, RCRESULT => :RCRESULT); END;", { NEXSQUEUE: prms.nQueueId, @@ -411,6 +618,14 @@ const execQueuePrc = async prms => { return rows[0]; } catch (e) { throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } } };