diff --git a/config.js b/config.js index 9000b25..aeb8aef 100644 --- a/config.js +++ b/config.js @@ -14,9 +14,17 @@ let dbConnect = { //Пароль пользователя БД password: "parus", //Строка подключения к БД - connectString: "DEMOP_CITKSERV", + connectString: "DEMOP_CITKSERV_WAN", //Модуль обслуживания БД - module: "parus_db.js" + module: "parus_oracle_db.js" +}; + +//Параметры обработки очереди исходящих сообщений +let outgoing = { + //Размер блока одновременно обрабатываемых исходящих сообщений + portionSize: 1, + //Скорость проверки наличия исходящих сообщений (мс) + checkTimeout: 500 }; //----------------- @@ -24,5 +32,6 @@ let dbConnect = { //----------------- module.exports = { - dbConnect + dbConnect, + outgoing }; diff --git a/core/db_connector.js b/core/db_connector.js index a7dff4c..fee61cf 100644 --- a/core/db_connector.js +++ b/core/db_connector.js @@ -17,9 +17,9 @@ const { ServerError } = require("@core/server_errors.js"); //Типовая ош //---------- //Состояния записей журнала работы сервиса -const NLOG_STATE_INF = 0; //Информация -const NLOG_STATE_WRN = 1; //Предупреждение -const NLOG_STATE_ERR = 2; //Ошибка +const MSG_TYPE_INF = 0; //Информация +const MSG_TYPE_WRN = 1; //Предупреждение +const MSG_TYPE_ERR = 2; //Ошибка //------------ // Тело модуля @@ -84,7 +84,11 @@ class DBConnector { //Подключиться к БД async connect() { try { - this.connection = await this.connector.connect(this.connectSettings); + this.connection = await this.connector.connect( + this.connectSettings.user, + this.connectSettings.password, + this.connectSettings.connectString + ); return this.connection; } catch (e) { throw new ServerError(glConst.ERR_DB_CONNECT, e.message); @@ -128,13 +132,22 @@ class DBConnector { throw new ServerError(glConst.ERR_DB_EXECUTE, e.message); } } + //Считать очередную порцию исходящих сообщений + async getOutgoing(portionSize) { + try { + let res = await this.connector.getQueueOutgoing(this.connection, portionSize); + return res; + } catch (e) { + throw new ServerError(glConst.ERR_DB_EXECUTE, e.message); + } + } } //----------------- // Интерфейс модуля //----------------- -exports.NLOG_STATE_INF = NLOG_STATE_INF; -exports.NLOG_STATE_WRN = NLOG_STATE_WRN; -exports.NLOG_STATE_ERR = NLOG_STATE_ERR; +exports.MSG_TYPE_INF = MSG_TYPE_INF; +exports.MSG_TYPE_WRN = MSG_TYPE_WRN; +exports.MSG_TYPE_ERR = MSG_TYPE_ERR; exports.DBConnector = DBConnector; diff --git a/index.js b/index.js index d3e7c9d..065c1e4 100644 --- a/index.js +++ b/index.js @@ -23,10 +23,16 @@ try { a.connect() .then(res => { console.log("CONNECTED"); - a.getServices() + a.getOutgoing(cfg.outgoing.portionSize) .then(res => { - console.log(res); - a.putLog(db.NLOG_STATE_WRN, "Сервер приложений подключен") + if (res.length > 0) { + res.map(r => { + console.log(r); + }); + } else { + console.log("NO MESSAGES IN QUEUE!!!"); + } + a.putLog(db.MSG_TYPE_INF, "Сервер приложений подключен") .then(res => { console.log(res); a.disconnect() diff --git a/modules/parus_db.js b/modules/parus_oracle_db.js similarity index 82% rename from modules/parus_db.js rename to modules/parus_oracle_db.js index a5851ba..327dfad 100644 --- a/modules/parus_db.js +++ b/modules/parus_oracle_db.js @@ -38,12 +38,12 @@ const SLOG_STATE_ERR = "ERR"; //Ошибка (строковый код) //------------ //Подключение к БД -const connect = async prms => { +const connect = async (user, password, connectString) => { try { const conn = await oracledb.getConnection({ - user: prms.user, - password: prms.password, - connectString: prms.connectString + user, + password, + connectString }); return conn; } catch (e) { @@ -170,7 +170,40 @@ const log = (connection, logState, msg, queueID) => { }; //Считывание очередной порции исходящих сообщений из очереди -const getQueueOutgoing = prms => {}; +const getQueueOutgoing = (connection, portionSize) => { + return new Promise((resolve, reject) => { + if (connection) { + connection.execute( + "BEGIN PKG_EXS.QUEUE_OUT_GET(NPORTION => :NPORTION, RCQUEUES => :RCQUEUES); END;", + { + NPORTION: portionSize, + RCQUEUES: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } + }, + { outFormat: oracledb.OBJECT, autoCommit: true, fetchInfo: { BMSG: { type: oracledb.BUFFER } } }, + (err, result) => { + if (err) { + reject(new Error(err.message)); + } else { + let cursor = result.outBinds.RCQUEUES; + let queryStream = cursor.toQueryStream(); + let rows = []; + queryStream.on("data", row => { + rows.push(row); + }); + queryStream.on("error", err => { + reject(new Error(err.message)); + }); + queryStream.on("close", () => { + resolve(rows); + }); + } + } + ); + } else { + reject(new Error("Не указано подключение")); + } + }); +}; //Помещение очередного входящего сообщения в очередь const putQueueIncoming = prms => {};