Функции коннектора для раздельной установки тела сообщения и ответа на него

This commit is contained in:
Mikhail Chechnev 2018-12-15 16:08:53 +03:00
parent 25b7b10bb7
commit 5c5bf9a50e
2 changed files with 103 additions and 5 deletions

View File

@ -426,14 +426,14 @@ class DBConnector extends EventEmitter {
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
} }
} }
//Установить результат обработки записи сервером приложений //Запись данных сообщения в позицию очереди
async setQueueAppSrvResult(prms) { async setQueueMsg(prms) {
if (this.bConnected) { if (this.bConnected) {
//Проверяем структуру переданных параметров //Проверяем структуру переданных параметров
let sCheckResult = validateObject( let sCheckResult = validateObject(
prms, prms,
prmsDBConnectorSchema.setQueueAppSrvResult, prmsDBConnectorSchema.setQueueMsg,
"Параметры функции установки результата обработки позиции очереди" "Параметры функции сохранения данных сообщения позиции очереди"
); );
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
@ -444,7 +444,36 @@ class DBConnector extends EventEmitter {
blMsg: prms.blMsg ? prms.blMsg : new Buffer(""), blMsg: prms.blMsg ? prms.blMsg : new Buffer(""),
connection: this.connection connection: this.connection
}); });
res = await this.connector.setQueueResp({ //Валидируем полученный ответ
sCheckResult = validateObject(res, objQueueSchema.Queue, "Изменённое сообщение очереди обмена");
if (sCheckResult) throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
//Вернём измененную запись
return res;
} catch (e) {
if (e instanceof ServerError) throw e;
else throw new ServerError(SERR_DB_EXECUTE, e.message);
}
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
} else {
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
}
}
//Запись ответа на сообщение в позицию очереди
async setQueueResp(prms) {
if (this.bConnected) {
//Проверяем структуру переданных параметров
let sCheckResult = validateObject(
prms,
prmsDBConnectorSchema.setQueueResp,
"Параметры функции сохранения данных ответа на сообщение позиции очереди"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Исполняем действие в БД
try {
let res = await this.connector.setQueueResp({
nQueueId: prms.nQueueId, nQueueId: prms.nQueueId,
blResp: prms.blResp ? prms.blResp : new Buffer(""), blResp: prms.blResp ? prms.blResp : new Buffer(""),
connection: this.connection connection: this.connection
@ -465,6 +494,29 @@ class DBConnector extends EventEmitter {
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД"); throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
} }
} }
//Установить результат обработки записи сервером приложений
async setQueueAppSrvResult(prms) {
if (this.bConnected) {
//Проверяем структуру переданных параметров
let sCheckResult = validateObject(
prms,
prmsDBConnectorSchema.setQueueAppSrvResult,
"Параметры функции установки результата обработки позиции очереди"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Исполняем действие в БД
let res = await this.setQueueMsg({ prms });
res = await this.setQueueResp({ prms });
//Вернём измененную запись
return res;
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
} else {
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
}
}
//Исполнить обработчик со стороны БД //Исполнить обработчик со стороны БД
async execQueueDBPrc(prms) { async execQueueDBPrc(prms) {
if (this.bConnected) { if (this.bConnected) {

View File

@ -250,6 +250,52 @@ exports.setQueueState = new Schema({
} }
}); });
//Схема валидации параметров функции записи данных сообщения в позицию очереди
exports.setQueueMsg = new Schema({
//Идентификатор позиции очереди
nQueueId: {
type: Number,
required: true,
message: {
type: path => `Идентификатор позиции очереди ((${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан идентификатор позиции очереди ((${path})`
}
},
//Данные сообщения очереди обмена
blMsg: {
use: { validateBuffer },
required: true,
message: {
validateBuffer: path =>
`Данные сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или Buffer)`,
required: path => `Не указаны данные сообщения очереди обмена (${path})`
}
}
}).validator({ required: val => val === null || val });
//Схема валидации параметров функции записи ответа на сообщение в позицию очереди
exports.setQueueResp = new Schema({
//Идентификатор позиции очереди
nQueueId: {
type: Number,
required: true,
message: {
type: path => `Идентификатор позиции очереди ((${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан идентификатор позиции очереди ((${path})`
}
},
//Данные ответа сообщения очереди обмена
blResp: {
use: { validateBuffer },
required: true,
message: {
validateBuffer: path =>
`Данные ответа сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или Buffer)`,
required: path => `Не указаны данные ответа сообщения очереди обмена (${path})`
}
}
}).validator({ required: val => val === null || val });
//Схема валидации параметров функции установки результата обработки позиции очереди //Схема валидации параметров функции установки результата обработки позиции очереди
exports.setQueueAppSrvResult = new Schema({ exports.setQueueAppSrvResult = new Schema({
//Идентификатор позиции очереди //Идентификатор позиции очереди