считывание сообщений из очереди исходящих

This commit is contained in:
Mikhail Chechnev 2018-11-12 02:44:29 +03:00
parent 73c414010f
commit d26b0d49c8
4 changed files with 79 additions and 18 deletions

View File

@ -14,9 +14,17 @@ let dbConnect = {
//Пароль пользователя БД
password: "parus",
//Строка подключения к БД
connectString: "DEMOP_CITKSERV",
connectString: "DEMOP_CITKSERV_WAN",
//Модуль обслуживания БД
module: "parus_db.js"
module: "parus_oracle_db.js"
};
//Параметры обработки очереди исходящих сообщений
let outgoing = {
//Размер блока одновременно обрабатываемых исходящих сообщений
portionSize: 1,
//Скорость проверки наличия исходящих сообщений (мс)
checkTimeout: 500
};
//-----------------
@ -24,5 +32,6 @@ let dbConnect = {
//-----------------
module.exports = {
dbConnect
dbConnect,
outgoing
};

View File

@ -17,9 +17,9 @@ const { ServerError } = require("@core/server_errors.js"); //Типовая ош
//----------
//Состояния записей журнала работы сервиса
const NLOG_STATE_INF = 0; //Информация
const NLOG_STATE_WRN = 1; //Предупреждение
const NLOG_STATE_ERR = 2; //Ошибка
const MSG_TYPE_INF = 0; //Информация
const MSG_TYPE_WRN = 1; //Предупреждение
const MSG_TYPE_ERR = 2; //Ошибка
//------------
// Тело модуля
@ -84,7 +84,11 @@ class DBConnector {
//Подключиться к БД
async connect() {
try {
this.connection = await this.connector.connect(this.connectSettings);
this.connection = await this.connector.connect(
this.connectSettings.user,
this.connectSettings.password,
this.connectSettings.connectString
);
return this.connection;
} catch (e) {
throw new ServerError(glConst.ERR_DB_CONNECT, e.message);
@ -128,13 +132,22 @@ class DBConnector {
throw new ServerError(glConst.ERR_DB_EXECUTE, e.message);
}
}
//Считать очередную порцию исходящих сообщений
async getOutgoing(portionSize) {
try {
let res = await this.connector.getQueueOutgoing(this.connection, portionSize);
return res;
} catch (e) {
throw new ServerError(glConst.ERR_DB_EXECUTE, e.message);
}
}
}
//-----------------
// Интерфейс модуля
//-----------------
exports.NLOG_STATE_INF = NLOG_STATE_INF;
exports.NLOG_STATE_WRN = NLOG_STATE_WRN;
exports.NLOG_STATE_ERR = NLOG_STATE_ERR;
exports.MSG_TYPE_INF = MSG_TYPE_INF;
exports.MSG_TYPE_WRN = MSG_TYPE_WRN;
exports.MSG_TYPE_ERR = MSG_TYPE_ERR;
exports.DBConnector = DBConnector;

View File

@ -23,10 +23,16 @@ try {
a.connect()
.then(res => {
console.log("CONNECTED");
a.getServices()
a.getOutgoing(cfg.outgoing.portionSize)
.then(res => {
console.log(res);
a.putLog(db.NLOG_STATE_WRN, "Сервер приложений подключен")
if (res.length > 0) {
res.map(r => {
console.log(r);
});
} else {
console.log("NO MESSAGES IN QUEUE!!!");
}
a.putLog(db.MSG_TYPE_INF, "Сервер приложений подключен")
.then(res => {
console.log(res);
a.disconnect()

View File

@ -38,12 +38,12 @@ const SLOG_STATE_ERR = "ERR"; //Ошибка (строковый код)
//------------
//Подключение к БД
const connect = async prms => {
const connect = async (user, password, connectString) => {
try {
const conn = await oracledb.getConnection({
user: prms.user,
password: prms.password,
connectString: prms.connectString
user,
password,
connectString
});
return conn;
} catch (e) {
@ -170,7 +170,40 @@ const log = (connection, logState, msg, queueID) => {
};
//Считывание очередной порции исходящих сообщений из очереди
const getQueueOutgoing = prms => {};
const getQueueOutgoing = (connection, portionSize) => {
return new Promise((resolve, reject) => {
if (connection) {
connection.execute(
"BEGIN PKG_EXS.QUEUE_OUT_GET(NPORTION => :NPORTION, RCQUEUES => :RCQUEUES); END;",
{
NPORTION: portionSize,
RCQUEUES: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT }
},
{ outFormat: oracledb.OBJECT, autoCommit: true, fetchInfo: { BMSG: { type: oracledb.BUFFER } } },
(err, result) => {
if (err) {
reject(new Error(err.message));
} else {
let cursor = result.outBinds.RCQUEUES;
let queryStream = cursor.toQueryStream();
let rows = [];
queryStream.on("data", row => {
rows.push(row);
});
queryStream.on("error", err => {
reject(new Error(err.message));
});
queryStream.on("close", () => {
resolve(rows);
});
}
}
);
} else {
reject(new Error("Не указано подключение"));
}
});
};
//Помещение очередного входящего сообщения в очередь
const putQueueIncoming = prms => {};