Функция добавления позиции очереди

This commit is contained in:
Mikhail Chechnev 2018-12-12 15:48:32 +03:00
parent 8d5d4e0c4c
commit 0fd015f86f
4 changed files with 106 additions and 16 deletions

View File

@ -322,6 +322,41 @@ class DBConnector extends EventEmitter {
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
} }
} }
//Добавить запись очереди обмена
async putQueue(prms) {
if (this.bConnected) {
//Проверяем структуру переданных параметров
let sCheckResult = validateObject(
prms,
prmsDBConnectorSchema.putQueue,
"Параметры функции добавления позиции очереди"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Исполняем действие в БД
try {
let res = await this.connector.putQueue({
nServiceFnId: prms.nServiceFnId,
blMsg: prms.blMsg ? prms.blMsg : new Buffer(""),
nQueueId: prms.nQueueId,
connection: this.connection
});
//Валидируем полученный ответ
sCheckResult = validateObject(res, objQueueSchema.Queue, "Добавленное сообщение очереди обмена");
if (sCheckResult) throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
//Вернём добавленную запись
return res;
} catch (e) {
if (e instanceof ServerError) throw e;
else throw new ServerError(SERR_DB_EXECUTE, e.message);
}
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
} else {
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
}
}
//Считать очередную порцию исходящих сообщений //Считать очередную порцию исходящих сообщений
async getOutgoing(prms) { async getOutgoing(prms) {
if (this.bConnected) { if (this.bConnected) {

View File

@ -84,6 +84,16 @@ exports.dbConnectorModule = new Schema({
required: "Не реализована функция считывания записи очереди обмена (getQueue)" required: "Не реализована функция считывания записи очереди обмена (getQueue)"
} }
}, },
//Добавление сообщения очереди
putQueue: {
use: { validateAsyncFunctionType },
required: true,
message: {
validateAsyncFunctionType:
"Функция добавления сообщения очереди (putQueue) имеет неверный формат (ожидалось - AsyncFunction)",
required: "Не реализована функция добавления сообщения очереди (putQueue)"
}
},
//Считывание записей исходящих сообщений очереди //Считывание записей исходящих сообщений очереди
getQueueOutgoing: { getQueueOutgoing: {
use: { validateAsyncFunctionType }, use: { validateAsyncFunctionType },
@ -94,16 +104,6 @@ exports.dbConnectorModule = new Schema({
required: "Не реализована функция считывания записей исходящих сообщений очереди (getQueueOutgoing)" required: "Не реализована функция считывания записей исходящих сообщений очереди (getQueueOutgoing)"
} }
}, },
//Добавление входящего сообщения очереди
putQueueIncoming: {
use: { validateAsyncFunctionType },
required: true,
message: {
validateAsyncFunctionType:
"Функция добавления входящего сообщения очереди (putQueueIncoming) имеет неверный формат (ожидалось - AsyncFunction)",
required: "Не реализована функция добавления входящего сообщения очереди (putQueueIncoming)"
}
},
//Уставновка состояния записи очереди //Уставновка состояния записи очереди
setQueueState: { setQueueState: {
use: { validateAsyncFunctionType }, use: { validateAsyncFunctionType },

View File

@ -147,6 +147,40 @@ exports.getQueue = new Schema({
} }
}); });
//Схема валидации параметров функции добавления позиции очереди
exports.putQueue = new Schema({
//Идентификатор функции сервиса обработчика позиции очереди
nServiceFnId: {
type: Number,
required: true,
message: {
type: path =>
`Идентификатор функции сервиса обработчика позиции очереди (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан идентификатор функции сервиса обработчика позиции очереди (${path})`
}
},
//Данные сообщения очереди обмена
blMsg: {
use: { validateBuffer },
required: false,
message: {
validateBuffer: path =>
`Данные сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или Buffer)`,
required: path => `Не указаны данные сообщения очереди обмена (${path})`
}
},
//Идентификатор связанной позиции очереди обмена
nQueueId: {
type: Number,
required: false,
message: {
type: path =>
`Идентификатор связанной позиции очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан идентификатор связанной позиции очереди обмена (${path})`
}
}
});
//Схема валидации параметров функции считывания исходящих сообщений //Схема валидации параметров функции считывания исходящих сообщений
exports.getOutgoing = new Schema({ exports.getOutgoing = new Schema({
//Количество считываемых сообщений очереди //Количество считываемых сообщений очереди
@ -223,8 +257,8 @@ exports.setQueueAppSrvResult = new Schema({
type: Number, type: Number,
required: true, required: true,
message: { message: {
type: "Идентификатор позиции очереди (nQueueId) имеет некорректный тип данных (ожидалось - Number)", type: path => `Идентификатор позиции очереди ((${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: "Не указан идентификатор позиции очереди (nQueueId)" required: path => `Не указан идентификатор позиции очереди ((${path})`
} }
}, },
//Данные сообщения очереди обмена //Данные сообщения очереди обмена

View File

@ -129,6 +129,30 @@ const getQueue = async prms => {
} }
}; };
//Помещение сообщения в очередь
const putQueue = async prms => {
try {
let res = await prms.connection.execute(
"BEGIN PKG_EXS.QUEUE_PUT(NEXSSERVICEFN => :NEXSSERVICEFN, BMSG => :BMSG, NEXSQUEUE => :NEXSQUEUE, RCQUEUE => :RCQUEUE); END;",
{
NEXSSERVICEFN: prms.nServiceFnId,
BMSG: prms.blMsg,
NEXSQUEUE: prms.nQueueId,
RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT }
},
{
outFormat: oracledb.OBJECT,
autoCommit: true,
fetchInfo: { blMsg: { type: oracledb.BUFFER } }
}
);
let rows = await readCursorData(res.outBinds.RCQUEUE);
return rows[0];
} catch (e) {
throw new Error(e.message);
}
};
//Считывание очередной порции исходящих сообщений из очереди //Считывание очередной порции исходящих сообщений из очереди
const getQueueOutgoing = async prms => { const getQueueOutgoing = async prms => {
try { try {
@ -150,9 +174,6 @@ const getQueueOutgoing = async prms => {
} }
}; };
//Помещение очередного входящего сообщения в очередь
const putQueueIncoming = async prms => {};
//Установка значения состояния в сообщении очереди //Установка значения состояния в сообщении очереди
const setQueueState = async prms => { const setQueueState = async prms => {
try { try {
@ -256,8 +277,8 @@ exports.getServices = getServices;
exports.getServiceFunctions = getServiceFunctions; exports.getServiceFunctions = getServiceFunctions;
exports.log = log; exports.log = log;
exports.getQueue = getQueue; exports.getQueue = getQueue;
exports.putQueue = putQueue;
exports.getQueueOutgoing = getQueueOutgoing; exports.getQueueOutgoing = getQueueOutgoing;
exports.putQueueIncoming = putQueueIncoming;
exports.setQueueState = setQueueState; exports.setQueueState = setQueueState;
exports.setQueueMsg = setQueueMsg; exports.setQueueMsg = setQueueMsg;
exports.setQueueResp = setQueueResp; exports.setQueueResp = setQueueResp;