/* Сервис интеграции ПП Парус 8 с WEB API Дополнительный модуль: работа с БД ПП Парус 8 (Oracle) */ //---------------------- // Подключение библиотек //---------------------- const oracledb = require("oracledb"); //Работа с СУБД Oracle //-------------------------- // Глобальные идентификаторы //-------------------------- const NPOOL_DRAIN_TIME = 30; //Таймаут ожидания завершения подключений при отключении пула от БД (сек) const DT_VARCHAR = oracledb.DB_TYPE_VARCHAR; //Тип данных "Строка" БД const DT_NUMBER = oracledb.DB_TYPE_NUMBER; //Тип данных "Число" БД const DT_DATE = oracledb.DB_TYPE_DATE; //Тип данных "Дата" БД const DT_CLOB = oracledb.DB_TYPE_CLOB; //Тип данных "Текстовые данные" БД const DT_BLOB = oracledb.DB_TYPE_BLOB; //Тип данных "Двоичные данные" БД const DT_CURSOR = oracledb.CURSOR; //Тип данных "Курсор" БД //------------ // Тело модуля //------------ //Формирование имени хранимого объекта БД (с параметрами) const makeStoredName = (sName, inPrms, outPrms, isFunction = false) => { let prms = ""; let resultVar = ""; for (i in inPrms) prms += `${prms ? ", " : ""}${i} => :${i}`; for (i in outPrms) { if (isFunction) resultVar = `:${i} := `; else prms += `${prms ? ", " : ""}${i} => :${i}`; break; } return `${resultVar ? resultVar : ""}${sName.replace(/\$[0-9]{1,9}$/, "").replace("$", ".")}(${prms})`; }; //Формирование параметров хранимого объекта БД const makeStoredPrms = (inPrms, outPrms) => { let prms = inPrms ? inPrms : {}; for (i in outPrms) { prms[i] = { type: outPrms[i], dir: oracledb.BIND_OUT }; break; } return prms; }; //Исполнение хранимого объекта БД const executeStored = async prms => { let pooledConnection; let outResult = {}; try { pooledConnection = await prms.connection.getConnection(); let outPrmName = ""; let outPrmIsCursor = false; for (i in prms.outPrms) { outPrmName = i; outPrmIsCursor = prms.outPrms[i] == DT_CURSOR; break; } let res = await pooledConnection.execute( `begin ${makeStoredName(prms.sName, prms.inPrms, prms.outPrms, prms.isFunction)}; end;`, makeStoredPrms(prms.inPrms, prms.outPrms), { ...(outPrmIsCursor ? { outFormat: oracledb.OBJECT } : {}), ...{ autoCommit: true } } ); if (res) if (res.outBinds) { const outBind = res.outBinds[outPrmName]; if (outPrmIsCursor) { const readCursorData = cursor => { return new Promise((resolve, reject) => { let queryStream = cursor.toQueryStream(); let rows = []; queryStream.on("data", row => { rows.push(row); }); queryStream.on("error", err => { queryStream.destroy(); reject(new Error(err.message)); }); queryStream.on("close", () => { queryStream.destroy(); resolve(rows); }); }); }; let rows = await readCursorData(outBind); let rowsRes = []; await Promise.all( rows.map(async row => { let rowRes = {}; for (i in row) { let isLob = row[i] ? (row[i].type ? row[i].type == oracledb.BLOB || row[i].type == oracledb.CLOB : false) : false; rowRes[i] = isLob ? await row[i].getData() : row[i]; } rowsRes.push(rowRes); }) ); outResult[outPrmName] = rowsRes; } else outResult[outPrmName] = outBind; } } catch (e) { throw new Error(e.message); } finally { if (pooledConnection) { try { await pooledConnection.close(); } catch (e) { throw new Error(e.message); } } } if (outResult) return outResult; }; //Проверка допустимого количества воркеров const checkWorkers = async prms => { let workersData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.UTL_LIC_CLNT_COUNT_GET", outPrms: { LIC_CNT: DT_NUMBER }, isFunction: true }); if (workersData.LIC_CNT === 0) { throw new Error(`Не определено количество лицензий для приложения "ExchangeServer".`); } if (prms.nMaxWorkers > workersData.LIC_CNT - 1) { throw new Error( `Недопустимое значение параметра "Количество одновременно обрабатываемых исходящих сообщений" ("outGoing.nMaxWorkers") файла конфигурации сервиса приложений ("config.js"). Максимальное количество одновременно обрабатываемых исходящих сообщений - ${ workersData.LIC_CNT - 1 }.` ); } }; //Проверка соответствия релизов сервера приложений и системы const checkRelease = async prms => { let releaseData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.UTL_PRODUCT_RELEASE_GET", outPrms: { DB_RELEASE: DT_VARCHAR }, isFunction: true }); if (releaseData.DB_RELEASE !== prms.sRelease) { throw new Error(`Версия сервера приложений (${prms.sRelease}) не соответствует версии системы (${releaseData.DB_RELEASE}).`); } }; //Подключение к БД const connect = async prms => { try { let pool = await oracledb.createPool({ user: prms.sUser, password: prms.sPassword, connectString: prms.sConnectString, queueTimeout: 600000, poolMin: prms.nPoolMin ? prms.nPoolMin : 4, poolMax: prms.nPoolMax ? prms.nPoolMax : 4, poolIncrement: prms.nPoolIncrement ? prms.nPoolIncrement : 0, sessionCallback: (connection, requestedTag, callback) => { if (prms.sSessionAppName) connection.module = prms.sSessionAppName; connection.execute(`ALTER SESSION SET CURRENT_SCHEMA=${prms.sSchema}`).then( r => { callback(null, connection); }, e => { callback(e, null); } ); } }); if (prms.bControlSystemVersion) { await checkRelease({ sRelease: prms.sRelease, connection: pool }); } await checkWorkers({ nMaxWorkers: prms.nMaxWorkers, connection: pool }); return pool; } catch (e) { throw new Error(e.message); } }; //Отключение от БД const disconnect = async prms => { try { await prms.connection.close(NPOOL_DRAIN_TIME); return; } catch (e) { throw new Error(e.message); } }; //Получение списка сервисов const getServices = async prms => { let servicesData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.SERVICES_GET", outPrms: { RCSERVICES: DT_CURSOR } }); return servicesData.RCSERVICES; }; //Получение списка функций сервиса const getServiceFunctions = async prms => { let serviceFunctionsData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.SERVICEFNS_GET", inPrms: { NEXSSERVICE: prms.nServiceId }, outPrms: { RCSERVICEFNS: DT_CURSOR } }); return serviceFunctionsData.RCSERVICEFNS; }; //Получение контекста сервиса const getServiceContext = async prms => { let serviceContextData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.SERVICE_CTX_GET", inPrms: { NFLAG_SMART: 0, NEXSSERVICE: prms.nServiceId }, outPrms: { RCSERVICE_CTX: DT_CURSOR } }); return serviceContextData.RCSERVICE_CTX[0]; }; //Установка контекста сервиса const setServiceContext = async prms => { await executeStored({ connection: prms.connection, sName: "PKG_EXS.SERVICE_CTX_SET", inPrms: { NEXSSERVICE: prms.nServiceId, SCTX: prms.sCtx, DCTX_EXP: prms.dCtxExp } }); }; //Очистка контекста сервиса const clearServiceContext = async prms => { await executeStored({ connection: prms.connection, sName: "PKG_EXS.SERVICE_CTX_CLEAR", inPrms: { NEXSSERVICE: prms.nServiceId } }); }; //Проверка атуентифицированности сервиса const isServiceAuth = async prms => { let serviceAuth = await executeStored({ connection: prms.connection, sName: "PKG_EXS.SERVICE_IS_AUTH", inPrms: { NEXSSERVICE: prms.nServiceId }, outPrms: { RET: DT_NUMBER }, isFunction: true }); return serviceAuth.RET; }; //Постановка в очередь задания на аутентификацию сервиса const putServiceAuthInQueue = async prms => { await executeStored({ connection: prms.connection, sName: "PKG_EXS.SERVICE_AUTH_PUT_INQUEUE_AT", inPrms: { NEXSSERVICE: prms.nServiceId, NFORCE: prms.nForce } }); }; //Получение информации о просроченных сообщениях обмена сервиса const getServiceExpiredQueueInfo = async prms => { let serviceExpiredQueueInfoData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.SERVICE_QUEUE_EXPIRED_INFO_GET", inPrms: { NEXSSERVICE: prms.nServiceId }, outPrms: { RCSERVICE_QUEUE_EXPIRED_INFO: DT_CURSOR } }); return serviceExpiredQueueInfoData.RCSERVICE_QUEUE_EXPIRED_INFO[0]; }; //Запись в протокол работы const log = async prms => { let logData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.LOG_PUT", inPrms: { NLOG_STATE: prms.nLogState, SMSG: prms.sMsg, NEXSSERVICE: prms.nServiceId, NEXSSERVICEFN: prms.nServiceFnId, NEXSQUEUE: prms.nQueueId }, outPrms: { RCLOG: DT_CURSOR } }); return logData.RCLOG[0]; }; //Считывание записи очереди обмена const getQueue = async prms => { let queueData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.QUEUE_GET", inPrms: { NFLAG_SMART: 0, NEXSQUEUE: prms.nQueueId }, outPrms: { RCQUEUE: DT_CURSOR } }); return queueData.RCQUEUE[0]; }; //Помещение сообщения в очередь const putQueue = async prms => { let queueData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.QUEUE_PUT", inPrms: { NEXSSERVICEFN: prms.nServiceFnId, BMSG: prms.blMsg, NEXSQUEUE: prms.nQueueId, NLNK_COMPANY: prms.nLnkCompanyId, NLNK_DOCUMENT: prms.nLnkDocumentId, SLNK_UNITCODE: prms.sLnkUnitcode, SOPTIONS: prms.sOptions }, outPrms: { RCQUEUE: DT_CURSOR } }); return queueData.RCQUEUE[0]; }; //Считывание очередной порции исходящих сообщений из очереди const getQueueOutgoing = async prms => { let queueOutgoingData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.QUEUE_SRV_TYPE_SEND_GET", inPrms: { NPORTION_SIZE: prms.nPortionSize }, outPrms: { RCQUEUES: DT_CURSOR } }); return queueOutgoingData.RCQUEUES; }; //Установка значения состояния в сообщении очереди const setQueueState = async prms => { let queueStateData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.QUEUE_EXEC_STATE_SET", inPrms: { NEXSQUEUE: prms.nQueueId, NEXEC_STATE: prms.nExecState, SEXEC_MSG: prms.sExecMsg, NINC_EXEC_CNT: prms.nIncExecCnt, NRESET_DATA: prms.nResetData }, outPrms: { RCQUEUE: DT_CURSOR } }); return queueStateData.RCQUEUE[0]; }; //Считывание данных сообщения очереди const getQueueMsg = async prms => { let queueMsgData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.QUEUE_MSG_GET", inPrms: { NEXSQUEUE: prms.nQueueId }, outPrms: { RCQUEUE_MSG: DT_CURSOR } }); return queueMsgData.RCQUEUE_MSG[0]; }; //Установка данных сообщения очереди const setQueueMsg = async prms => { let queueMsgData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.QUEUE_MSG_SET", inPrms: { NEXSQUEUE: prms.nQueueId, BMSG: prms.blMsg }, outPrms: { RCQUEUE: DT_CURSOR } }); return queueMsgData.RCQUEUE[0]; }; //Установка параметров сообщения очереди const setQueueOptions = async prms => { let queueOptionsData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.QUEUE_OPTIONS_SET", inPrms: { NEXSQUEUE: prms.nQueueId, SOPTIONS: prms.sOptions }, outPrms: { RCQUEUE: DT_CURSOR } }); return queueOptionsData.RCQUEUE[0]; }; //Считывание результата обработки сообщения очереди const getQueueResp = async prms => { let queueRespData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.QUEUE_RESP_GET", inPrms: { NEXSQUEUE: prms.nQueueId }, outPrms: { RCQUEUE_RESP: DT_CURSOR } }); return queueRespData.RCQUEUE_RESP[0]; }; //Установка результата обработки сообщения очереди const setQueueResp = async prms => { let queueRespData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.QUEUE_RESP_SET", inPrms: { NEXSQUEUE: prms.nQueueId, BRESP: prms.blResp, NIS_ORIGINAL: prms.nIsOriginal }, outPrms: { RCQUEUE: DT_CURSOR } }); return queueRespData.RCQUEUE[0]; }; //Установка параметров результата обработки сообщения очереди const setQueueOptionsResp = async prms => { let queueOptionsRespData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.QUEUE_OPTIONS_RESP_SET", inPrms: { NEXSQUEUE: prms.nQueueId, SOPTIONS_RESP: prms.sOptionsResp }, outPrms: { RCQUEUE: DT_CURSOR } }); return queueOptionsRespData.RCQUEUE[0]; }; //Исполнение обработчика со стороны БД для сообщения очереди const execQueuePrc = async prms => { let queuePrcData = await executeStored({ connection: prms.connection, sName: "PKG_EXS.QUEUE_PRC", inPrms: { NEXSQUEUE: prms.nQueueId }, outPrms: { RCRESULT: DT_CURSOR } }); return queuePrcData.RCRESULT[0]; }; //----------------- // Интерфейс модуля //----------------- exports.DT_VARCHAR = DT_VARCHAR; exports.DT_NUMBER = DT_NUMBER; exports.DT_DATE = DT_DATE; exports.DT_CLOB = DT_CLOB; exports.DT_BLOB = DT_BLOB; exports.DT_CURSOR = DT_CURSOR; exports.executeStored = executeStored; exports.connect = connect; exports.disconnect = disconnect; exports.getServices = getServices; exports.getServiceFunctions = getServiceFunctions; exports.getServiceContext = getServiceContext; exports.setServiceContext = setServiceContext; exports.clearServiceContext = clearServiceContext; exports.isServiceAuth = isServiceAuth; exports.putServiceAuthInQueue = putServiceAuthInQueue; exports.getServiceExpiredQueueInfo = getServiceExpiredQueueInfo; exports.log = log; exports.getQueue = getQueue; exports.putQueue = putQueue; exports.getQueueOutgoing = getQueueOutgoing; exports.setQueueState = setQueueState; exports.getQueueMsg = getQueueMsg; exports.setQueueMsg = setQueueMsg; exports.setQueueOptions = setQueueOptions; exports.getQueueResp = getQueueResp; exports.setQueueResp = setQueueResp; exports.setQueueOptionsResp = setQueueOptionsResp; exports.execQueuePrc = execQueuePrc;