Метод установки состояния позиции очереди и асинхронное чтение записей из курсора

This commit is contained in:
Mikhail Chechnev 2018-11-20 21:56:22 +03:00
parent d2fe21ea1b
commit 3c4bb16b0c

View File

@ -33,6 +33,26 @@ const SLOG_STATE_INF = "INF"; //Информация (строковый код)
const SLOG_STATE_WRN = "WRN"; //Предупреждение (строковые коды)
const SLOG_STATE_ERR = "ERR"; //Ошибка (строковый код)
// Состояния исполнения записей очереди сервиса
const NQUEUE_EXEC_STATE_INQUEUE = 0; //Поставлено в очередь
const NQUEUE_EXEC_STATE_APP = 1; //Обрабатывается сервером приложений
const NQUEUE_EXEC_STATE_APP_OK = 2; //Успешно обработано сервером приложений
const NQUEUE_EXEC_STATE_APP_ERR = 3; //Ошибка обработки сервером приложений
const NQUEUE_EXEC_STATE_DB = 4; //Обрабатывается СУБД
const NQUEUE_EXEC_STATE_DB_OK = 5; //Успешно обработано СУБД
const NQUEUE_EXEC_STATE_DB_ERR = 6; //Ошибка обработки СУБД
const NQUEUE_EXEC_STATE_OK = 7; //Обработано успешно
const NQUEUE_EXEC_STATE_ERR = 8; //Обработано с ошибками
const SQUEUE_EXEC_STATE_INQUEUE = "INQUEUE"; //Поставлено в очередь
const SQUEUE_EXEC_STATE_APP = "APP"; //Обрабатывается сервером приложений
const SQUEUE_EXEC_STATE_APP_OK = "APP_OK"; //Успешно обработано сервером приложений
const SQUEUE_EXEC_STATE_APP_ERR = "APP_ERR"; //Ошибка обработки сервером приложений
const SQUEUE_EXEC_STATE_DB = "DB"; //Обрабатывается СУБД
const SQUEUE_EXEC_STATE_DB_OK = "DB_OK"; //Успешно обработано СУБД
const SQUEUE_EXEC_STATE_DB_ERR = "DB_ERR"; //Ошибка обработки СУБД
const SQUEUE_EXEC_STATE_OK = "OK"; //Обработано успешно
const SQUEUE_EXEC_STATE_ERR = "ERR"; //Обработано с ошибками
//------------
// Тело модуля
//------------
@ -72,6 +92,23 @@ const disconnect = async prms => {
}
};
//Чтение данных из курсора
const readCursorData = cursor => {
return new Promise((resolve, reject) => {
let queryStream = cursor.toQueryStream();
let rows = [];
queryStream.on("data", row => {
rows.push(row);
});
queryStream.on("error", err => {
reject(new Error(err.message));
});
queryStream.on("close", () => {
resolve(rows);
});
});
};
//Получение списка сервисов
const getServices = prms => {
return new Promise((resolve, reject) => {
@ -231,7 +268,24 @@ const getQueueOutgoing = prms => {
const putQueueIncoming = prms => {};
//Установка значения в сообщении очереди
const setQueueValue = prms => {};
const setQueueState = async prms => {
try {
let res = await prms.connection.execute(
"BEGIN PKG_EXS.QUEUE_EXEC_STATE_SET(NEXSQUEUE => :NEXSQUEUE, NEXEC_STATE => :NEXEC_STATE, SEXEC_MSG => :SEXEC_MSG, RCQUEUE => :RCQUEUE); END;",
{
NEXSQUEUE: prms.nQueueId,
NEXEC_STATE: prms.nExecState,
SEXEC_MSG: prms.sExecMsg,
RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT }
},
{ outFormat: oracledb.OBJECT, autoCommit: true, fetchInfo: { bMsg: { type: oracledb.BUFFER } } }
);
let rows = await readCursorData(res.outBinds.RCQUEUE);
return rows;
} catch (e) {
throw new Error(e.message);
}
};
//-----------------
// Интерфейс модуля
@ -244,4 +298,4 @@ exports.getServiceFunctions = getServiceFunctions;
exports.log = log;
exports.getQueueOutgoing = getQueueOutgoing;
exports.putQueueIncoming = putQueueIncoming;
exports.setQueueValue = setQueueValue;
exports.setQueueState = setQueueState;