Функция считывания позиции очереди по идентификатору

This commit is contained in:
Mikhail Chechnev 2018-12-06 21:42:59 +03:00
parent dd44f0e5eb
commit b7a9c62429
4 changed files with 79 additions and 0 deletions

View File

@ -289,6 +289,39 @@ class DBConnector extends EventEmitter {
throw new ServerError(SERR_DB_EXECUTE, e.message); throw new ServerError(SERR_DB_EXECUTE, e.message);
} }
} }
//Считать запись очереди обмена
async getQueue(prms) {
if (this.bConnected) {
//Проверяем структуру переданных параметров
let sCheckResult = validateObject(
prms,
prmsDBConnectorSchema.getQueue,
"Параметры функции считывания записи очереди обмена"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Подготовим параметры
let getQueueData = _.cloneDeep(prms);
getQueueData.connection = this.connection;
try {
//Исполняем действие в БД
let res = await this.connector.getQueue(getQueueData);
//Валидируем полученный ответ
sCheckResult = validateObject(res, objQueueSchema.Queue, "Сообщение очереди обмена");
if (sCheckResult) throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
//Вернём считанную запись
return res;
} catch (e) {
if (e instanceof ServerError) throw e;
else throw new ServerError(SERR_DB_EXECUTE, e.message);
}
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
} else {
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
}
}
//Считать очередную порцию исходящих сообщений //Считать очередную порцию исходящих сообщений
async getOutgoing(prms) { async getOutgoing(prms) {
if (this.bConnected) { if (this.bConnected) {

View File

@ -74,6 +74,16 @@ exports.dbConnectorModule = new Schema({
required: "Не реализована функция протоколирования работы сервиса (log)" required: "Не реализована функция протоколирования работы сервиса (log)"
} }
}, },
//Считывание записи очереди обмена
getQueue: {
use: { validateAsyncFunctionType },
required: true,
message: {
validateAsyncFunctionType:
"Функция считывания записи очереди обмена (getQueue) имеет неверный формат (ожидалось - AsyncFunction)",
required: "Не реализована функция считывания записи очереди обмена (getQueue)"
}
},
//Считывание записей исходящих сообщений очереди //Считывание записей исходящих сообщений очереди
getQueueOutgoing: { getQueueOutgoing: {
use: { validateAsyncFunctionType }, use: { validateAsyncFunctionType },

View File

@ -134,6 +134,19 @@ exports.putLog = new Schema({
} }
}); });
//Схема валидации параметров функции считывания позиции очереди
exports.getQueue = new Schema({
//Идентификатор позиции очереди обмена
nQueueId: {
type: Number,
required: true,
message: {
type: "Идентификатор позиции очереди обмена (nQueueId) имеет некорректный тип данных (ожидалось - Number)",
required: "Не указан идентификатор позиции очереди обмена (nQueueId)"
}
}
});
//Схема валидации параметров функции считывания исходящих сообщений //Схема валидации параметров функции считывания исходящих сообщений
exports.getOutgoing = new Schema({ exports.getOutgoing = new Schema({
//Количество считываемых сообщений очереди //Количество считываемых сообщений очереди

View File

@ -107,6 +107,28 @@ const log = async prms => {
} }
}; };
//Считывание записи очереди обмена
const getQueue = async prms => {
try {
let res = await prms.connection.execute(
"BEGIN PKG_EXS.QUEUE_GET(NFLAG_SMART => 0, NEXSQUEUE => :NEXSQUEUE, RCQUEUE => :RCQUEUE); END;",
{
NEXSQUEUE: prms.nQueueId,
RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT }
},
{
outFormat: oracledb.OBJECT,
autoCommit: true,
fetchInfo: { blMsg: { type: oracledb.BUFFER }, blResp: { type: oracledb.BUFFER } }
}
);
let rows = await readCursorData(res.outBinds.RCQUEUE);
return rows[0];
} catch (e) {
throw new Error(e.message);
}
};
//Считывание очередной порции исходящих сообщений из очереди //Считывание очередной порции исходящих сообщений из очереди
const getQueueOutgoing = async prms => { const getQueueOutgoing = async prms => {
try { try {
@ -233,6 +255,7 @@ exports.disconnect = disconnect;
exports.getServices = getServices; exports.getServices = getServices;
exports.getServiceFunctions = getServiceFunctions; exports.getServiceFunctions = getServiceFunctions;
exports.log = log; exports.log = log;
exports.getQueue = getQueue;
exports.getQueueOutgoing = getQueueOutgoing; exports.getQueueOutgoing = getQueueOutgoing;
exports.putQueueIncoming = putQueueIncoming; exports.putQueueIncoming = putQueueIncoming;
exports.setQueueState = setQueueState; exports.setQueueState = setQueueState;