Функции установки параметров (HTP-заголовка и параметров отправки) сообщений и ответов для очереди + доработка модели очереди для считывания параметров сообщения и ответа на него
This commit is contained in:
parent
6bb8b46682
commit
43fd7de6d8
@ -689,6 +689,39 @@ class DBConnector extends EventEmitter {
|
|||||||
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
|
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
//Запись параметров сообщения в позицию очереди
|
||||||
|
async setQueueOptions(prms) {
|
||||||
|
if (this.bConnected) {
|
||||||
|
//Проверяем структуру переданных параметров
|
||||||
|
let sCheckResult = validateObject(
|
||||||
|
prms,
|
||||||
|
prmsDBConnectorSchema.setQueueOptions,
|
||||||
|
"Параметры функции сохранения параметров сообщения позиции очереди"
|
||||||
|
);
|
||||||
|
//Если структура объекта в норме
|
||||||
|
if (!sCheckResult) {
|
||||||
|
//Подготовим параметры
|
||||||
|
let setQueueOptionsData = _.cloneDeep(prms);
|
||||||
|
setQueueOptionsData.connection = this.connection;
|
||||||
|
//Исполняем действие в БД
|
||||||
|
try {
|
||||||
|
let res = await this.connector.setQueueOptions(setQueueOptionsData);
|
||||||
|
//Валидируем полученный ответ
|
||||||
|
sCheckResult = validateObject(res, objQueueSchema.Queue, "Изменённое сообщение очереди обмена");
|
||||||
|
if (sCheckResult) throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||||||
|
//Вернём измененную запись
|
||||||
|
return res;
|
||||||
|
} catch (e) {
|
||||||
|
if (e instanceof ServerError) throw e;
|
||||||
|
else throw new ServerError(SERR_DB_EXECUTE, e.message);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
|
||||||
|
}
|
||||||
|
}
|
||||||
//Считывание ответа на сообщение из позиции очереди
|
//Считывание ответа на сообщение из позиции очереди
|
||||||
async getQueueResp(prms) {
|
async getQueueResp(prms) {
|
||||||
if (this.bConnected) {
|
if (this.bConnected) {
|
||||||
@ -760,6 +793,39 @@ class DBConnector extends EventEmitter {
|
|||||||
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
|
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
//Запись параметров ответа на сообщение в позицию очереди
|
||||||
|
async setQueueOptionsResp(prms) {
|
||||||
|
if (this.bConnected) {
|
||||||
|
//Проверяем структуру переданных параметров
|
||||||
|
let sCheckResult = validateObject(
|
||||||
|
prms,
|
||||||
|
prmsDBConnectorSchema.setQueueOptionsResp,
|
||||||
|
"Параметры функции сохранения параметров ответа на сообщение позиции очереди"
|
||||||
|
);
|
||||||
|
//Если структура объекта в норме
|
||||||
|
if (!sCheckResult) {
|
||||||
|
//Подготовим параметры
|
||||||
|
let setQueueOptionsRespData = _.cloneDeep(prms);
|
||||||
|
setQueueOptionsRespData.connection = this.connection;
|
||||||
|
//Исполняем действие в БД
|
||||||
|
try {
|
||||||
|
let res = await this.connector.setQueueOptionsResp(setQueueOptionsRespData);
|
||||||
|
//Валидируем полученный ответ
|
||||||
|
sCheckResult = validateObject(res, objQueueSchema.Queue, "Изменённое сообщение очереди обмена");
|
||||||
|
if (sCheckResult) throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||||||
|
//Вернём измененную запись
|
||||||
|
return res;
|
||||||
|
} catch (e) {
|
||||||
|
if (e instanceof ServerError) throw e;
|
||||||
|
else throw new ServerError(SERR_DB_EXECUTE, e.message);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
|
||||||
|
}
|
||||||
|
}
|
||||||
//Установить результат обработки записи сервером приложений
|
//Установить результат обработки записи сервером приложений
|
||||||
async setQueueAppSrvResult(prms) {
|
async setQueueAppSrvResult(prms) {
|
||||||
if (this.bConnected) {
|
if (this.bConnected) {
|
||||||
|
@ -195,6 +195,16 @@ exports.dbConnectorModule = new Schema({
|
|||||||
required: path => `Не реализована функция установки данных сообщения очереди (${path})`
|
required: path => `Не реализована функция установки данных сообщения очереди (${path})`
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
//Установка параметров сообщения очереди
|
||||||
|
setQueueOptions: {
|
||||||
|
use: { validateAsyncFunctionType },
|
||||||
|
required: true,
|
||||||
|
message: {
|
||||||
|
validateAsyncFunctionType: path =>
|
||||||
|
`Функция установки параметров сообщения очереди (${path}) имеет неверный формат (ожидалось - AsyncFunction)`,
|
||||||
|
required: path => `Не реализована функция установки параметров сообщения очереди (${path})`
|
||||||
|
}
|
||||||
|
},
|
||||||
//Считывание результата обработки сообщения очереди
|
//Считывание результата обработки сообщения очереди
|
||||||
getQueueResp: {
|
getQueueResp: {
|
||||||
use: { validateAsyncFunctionType },
|
use: { validateAsyncFunctionType },
|
||||||
@ -215,6 +225,17 @@ exports.dbConnectorModule = new Schema({
|
|||||||
required: path => `Не реализована функция установки результата обработки сообщения очереди (${path})`
|
required: path => `Не реализована функция установки результата обработки сообщения очереди (${path})`
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
//Установка параметров результата обработки сообщения очереди
|
||||||
|
setQueueOptionsResp: {
|
||||||
|
use: { validateAsyncFunctionType },
|
||||||
|
required: true,
|
||||||
|
message: {
|
||||||
|
validateAsyncFunctionType: path =>
|
||||||
|
`Функция установки параметров результата обработки сообщения очереди (${path}) имеет неверный формат (ожидалось - AsyncFunction)`,
|
||||||
|
required: path =>
|
||||||
|
`Не реализована функция установки параметров результата обработки сообщения очереди (${path})`
|
||||||
|
}
|
||||||
|
},
|
||||||
//Исполнение обработчика со стороны БД для сообщения очереди
|
//Исполнение обработчика со стороны БД для сообщения очереди
|
||||||
execQueuePrc: {
|
execQueuePrc: {
|
||||||
use: { validateAsyncFunctionType },
|
use: { validateAsyncFunctionType },
|
||||||
|
@ -257,6 +257,26 @@ exports.Queue = new Schema({
|
|||||||
`Идентификатор связанного сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
|
`Идентификатор связанного сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
|
||||||
required: path => `Не указан идентификатор связанного сообщения очереди обмена (${path})`
|
required: path => `Не указан идентификатор связанного сообщения очереди обмена (${path})`
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
//Параметры сообщения
|
||||||
|
sOptions: {
|
||||||
|
type: String,
|
||||||
|
required: false,
|
||||||
|
message: {
|
||||||
|
type: path =>
|
||||||
|
`Параметры сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`,
|
||||||
|
required: path => `Не указаны параметры сообщения очереди обмена (${path})`
|
||||||
|
}
|
||||||
|
},
|
||||||
|
//Параметры ответа
|
||||||
|
sOptionsResp: {
|
||||||
|
type: String,
|
||||||
|
required: false,
|
||||||
|
message: {
|
||||||
|
type: path =>
|
||||||
|
`Параметры ответа на сообщение очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`,
|
||||||
|
required: path => `Не указаны параметры ответа на сообщение очереди обмена (${path})`
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -455,6 +455,29 @@ exports.setQueueMsg = new Schema({
|
|||||||
}
|
}
|
||||||
}).validator({ required: val => val === null || val });
|
}).validator({ required: val => val === null || val });
|
||||||
|
|
||||||
|
//Схема валидации параметров функции записи параметров сообщения в позицию очереди
|
||||||
|
exports.setQueueOptions = new Schema({
|
||||||
|
//Идентификатор позиции очереди
|
||||||
|
nQueueId: {
|
||||||
|
type: Number,
|
||||||
|
required: true,
|
||||||
|
message: {
|
||||||
|
type: path => `Идентификатор позиции очереди (${path}) имеет некорректный тип данных (ожидалось - Number)`,
|
||||||
|
required: path => `Не указан идентификатор позиции очереди (${path})`
|
||||||
|
}
|
||||||
|
},
|
||||||
|
//Параметры сообщения очереди обмена
|
||||||
|
sOptions: {
|
||||||
|
type: String,
|
||||||
|
required: true,
|
||||||
|
message: {
|
||||||
|
validateBuffer: path =>
|
||||||
|
`Парамметры сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`,
|
||||||
|
required: path => `Не указаны параметры сообщения очереди обмена (${path})`
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).validator({ required: val => val === null || val === 0 || val });
|
||||||
|
|
||||||
//Схема валидации параметров функции считывание ответа на сообщение из позиции очереди
|
//Схема валидации параметров функции считывание ответа на сообщение из позиции очереди
|
||||||
exports.getQueueResp = new Schema({
|
exports.getQueueResp = new Schema({
|
||||||
//Идентификатор позиции очереди
|
//Идентификатор позиции очереди
|
||||||
@ -503,6 +526,29 @@ exports.setQueueResp = new Schema({
|
|||||||
}
|
}
|
||||||
}).validator({ required: val => val === null || val === 0 || val });
|
}).validator({ required: val => val === null || val === 0 || val });
|
||||||
|
|
||||||
|
//Схема валидации параметров функции записи параметров ответа на сообщение в позицию очереди
|
||||||
|
exports.setQueueOptionsResp = new Schema({
|
||||||
|
//Идентификатор позиции очереди
|
||||||
|
nQueueId: {
|
||||||
|
type: Number,
|
||||||
|
required: true,
|
||||||
|
message: {
|
||||||
|
type: path => `Идентификатор позиции очереди (${path}) имеет некорректный тип данных (ожидалось - Number)`,
|
||||||
|
required: path => `Не указан идентификатор позиции очереди (${path})`
|
||||||
|
}
|
||||||
|
},
|
||||||
|
//Параметры ответа на сообщение очереди обмена
|
||||||
|
sOptionsResp: {
|
||||||
|
type: String,
|
||||||
|
required: true,
|
||||||
|
message: {
|
||||||
|
validateBuffer: path =>
|
||||||
|
`Парамметры ответа на сообщение очереди обмена (${path}) имеют некорректный тип данных (ожидалось - String)`,
|
||||||
|
required: path => `Не указаны параметры ответа на сообщение очереди обмена (${path})`
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).validator({ required: val => val === null || val === 0 || val });
|
||||||
|
|
||||||
//Схема валидации параметров функции установки результата обработки позиции очереди
|
//Схема валидации параметров функции установки результата обработки позиции очереди
|
||||||
exports.setQueueAppSrvResult = new Schema({
|
exports.setQueueAppSrvResult = new Schema({
|
||||||
//Идентификатор позиции очереди
|
//Идентификатор позиции очереди
|
||||||
|
@ -316,6 +316,25 @@ const setQueueMsg = async prms => {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
//Установка параметров сообщения очереди
|
||||||
|
const setQueueOptions = async prms => {
|
||||||
|
try {
|
||||||
|
let res = await prms.connection.execute(
|
||||||
|
"BEGIN PKG_EXS.QUEUE_OPTIONS_SET(NEXSQUEUE => :NEXSQUEUE, SOPTIONS => :SOPTIONS, RCQUEUE => :RCQUEUE); END;",
|
||||||
|
{
|
||||||
|
NEXSQUEUE: prms.nQueueId,
|
||||||
|
SOPTIONS: prms.sOptions,
|
||||||
|
RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT }
|
||||||
|
},
|
||||||
|
{ outFormat: oracledb.OBJECT, autoCommit: true }
|
||||||
|
);
|
||||||
|
let rows = await readCursorData(res.outBinds.RCQUEUE);
|
||||||
|
return rows[0];
|
||||||
|
} catch (e) {
|
||||||
|
throw new Error(e.message);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
//Считывание результата обработки сообщения очереди
|
//Считывание результата обработки сообщения очереди
|
||||||
const getQueueResp = async prms => {
|
const getQueueResp = async prms => {
|
||||||
try {
|
try {
|
||||||
@ -358,6 +377,25 @@ const setQueueResp = async prms => {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
//Установка параметров результата обработки сообщения очереди
|
||||||
|
const setQueueOptionsResp = async prms => {
|
||||||
|
try {
|
||||||
|
let res = await prms.connection.execute(
|
||||||
|
"BEGIN PKG_EXS.QUEUE_OPTIONS_RESP_SET(NEXSQUEUE => :NEXSQUEUE, SOPTIONS_RESP => :SOPTIONS_RESP, RCQUEUE => :RCQUEUE); END;",
|
||||||
|
{
|
||||||
|
NEXSQUEUE: prms.nQueueId,
|
||||||
|
SOPTIONS_RESP: prms.sOptionsResp,
|
||||||
|
RCQUEUE: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT }
|
||||||
|
},
|
||||||
|
{ outFormat: oracledb.OBJECT, autoCommit: true }
|
||||||
|
);
|
||||||
|
let rows = await readCursorData(res.outBinds.RCQUEUE);
|
||||||
|
return rows[0];
|
||||||
|
} catch (e) {
|
||||||
|
throw new Error(e.message);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
//Исполнение обработчика со стороны БД для сообщения очереди
|
//Исполнение обработчика со стороны БД для сообщения очереди
|
||||||
const execQueuePrc = async prms => {
|
const execQueuePrc = async prms => {
|
||||||
try {
|
try {
|
||||||
@ -397,6 +435,8 @@ exports.getQueueOutgoing = getQueueOutgoing;
|
|||||||
exports.setQueueState = setQueueState;
|
exports.setQueueState = setQueueState;
|
||||||
exports.getQueueMsg = getQueueMsg;
|
exports.getQueueMsg = getQueueMsg;
|
||||||
exports.setQueueMsg = setQueueMsg;
|
exports.setQueueMsg = setQueueMsg;
|
||||||
|
exports.setQueueOptions = setQueueOptions;
|
||||||
exports.getQueueResp = getQueueResp;
|
exports.getQueueResp = getQueueResp;
|
||||||
exports.setQueueResp = setQueueResp;
|
exports.setQueueResp = setQueueResp;
|
||||||
|
exports.setQueueOptionsResp = setQueueOptionsResp;
|
||||||
exports.execQueuePrc = execQueuePrc;
|
exports.execQueuePrc = execQueuePrc;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user