Переписана логика обработчиков - данные записей очереди (MSG BLOB и RESP BLOB) считываются и записываются только по необходимости. Это ускорило обмен с БД и сократило время обработки запросов в среднем в 2 раза

This commit is contained in:
Mikhail Chechnev 2018-12-15 22:33:36 +03:00
parent cc7970b998
commit 374d44d163
2 changed files with 47 additions and 18 deletions

View File

@ -78,7 +78,7 @@ class InQueue extends EventEmitter {
} }
//Обработка сообщения //Обработка сообщения
async processMessage(prms) { async processMessage(prms) {
//Проверяем структуру переданного объекта для старта //Проверяем структуру переданного объекта для обработки
let sCheckResult = validateObject( let sCheckResult = validateObject(
prms, prms,
prmsInQueueSchema.processMessage, prmsInQueueSchema.processMessage,
@ -89,9 +89,10 @@ class InQueue extends EventEmitter {
//Буфер для сообщения очереди //Буфер для сообщения очереди
let q = null; let q = null;
try { try {
//Определимся с телом сообщения //Тело сообщения и ответ на него
let blMsg = null; let blMsg = null;
//Для POST сообщений - это тело запроса let blResp = null;
//Определимся с телом сообщения - для POST сообщений - это тело запроса
if (prms.function.nFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) { if (prms.function.nFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) {
blMsg = prms.req.body && !_.isEmpty(prms.req.body) ? prms.req.body : null; blMsg = prms.req.body && !_.isEmpty(prms.req.body) ? prms.req.body : null;
} else { } else {
@ -122,6 +123,8 @@ class InQueue extends EventEmitter {
let resBefore = null; let resBefore = null;
try { try {
prms.queue = q; prms.queue = q;
prms.queue.blMsg = blMsg;
prms.queue.blResp = blResp;
resBefore = await fnBefore(prms); resBefore = await fnBefore(prms);
} catch (e) { } catch (e) {
throw new ServerError(SERR_APP_SERVER_BEFORE, e.message); throw new ServerError(SERR_APP_SERVER_BEFORE, e.message);
@ -142,10 +145,17 @@ class InQueue extends EventEmitter {
}); });
//Фиксируем успех исполнения //Фиксируем успех исполнения
if (resBefore.blMsg) { if (resBefore.blMsg) {
q = await this.dbConn.setQueueAppSrvResult({ blMsg = resBefore.blMsg;
q = await this.dbConn.setQueueMsg({
nQueueId: q.nId, nQueueId: q.nId,
blMsg: resBefore.blMsg, blMsg
blResp: null });
}
if (resBefore.blResp) {
blResp = resBefore.blResp;
q = await this.dbConn.setQueueResp({
nQueueId: q.nId,
blResp
}); });
} }
} else { } else {
@ -168,6 +178,9 @@ class InQueue extends EventEmitter {
nQueueId: q.nId, nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK
}); });
//Считаем ответ полученный от системы
let qData = await this.dbConn.getQueueResp({ nQueueId: prms.queue.nId });
blResp = qData.blResp;
} }
//Выполняем обработчик "После" (если он есть) //Выполняем обработчик "После" (если он есть)
if (prms.function.sAppSrvAfter) { if (prms.function.sAppSrvAfter) {
@ -181,6 +194,8 @@ class InQueue extends EventEmitter {
let resAfter = null; let resAfter = null;
try { try {
prms.queue = q; prms.queue = q;
prms.queue.blMsg = blMsg;
prms.queue.blResp = blResp;
resAfter = await fnAfter(prms); resAfter = await fnAfter(prms);
} catch (e) { } catch (e) {
throw new ServerError(SERR_APP_SERVER_AFTER, e.message); throw new ServerError(SERR_APP_SERVER_AFTER, e.message);
@ -200,11 +215,13 @@ class InQueue extends EventEmitter {
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
}); });
//Фиксируем успех исполнения //Фиксируем успех исполнения
q = await this.dbConn.setQueueAppSrvResult({ if (resAfter.blResp) {
nQueueId: q.nId, blResp = resAfter.blResp;
blMsg: q.blMsg, q = await this.dbConn.setQueueResp({
blResp: resAfter.blResp nQueueId: q.nId,
}); blResp
});
}
} else { } else {
//Или расскажем об ошибке //Или расскажем об ошибке
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
@ -212,7 +229,7 @@ class InQueue extends EventEmitter {
} }
} }
//Всё успешно - отдаём результат клиенту //Всё успешно - отдаём результат клиенту
prms.res.status(200).send(q.blResp); prms.res.status(200).send(blResp);
//Фиксируем успех обработки - в статусе сообщения //Фиксируем успех обработки - в статусе сообщения
q = await this.dbConn.setQueueState({ q = await this.dbConn.setQueueState({
nQueueId: q.nId, nQueueId: q.nId,

View File

@ -12,11 +12,12 @@ const _ = require("lodash"); //Работа с массивами и объек
const rqp = require("request-promise"); //Работа с HTTP/HTTPS запросами const rqp = require("request-promise"); //Работа с HTTP/HTTPS запросами
const lg = require("./logger"); //Протоколирование работы const lg = require("./logger"); //Протоколирование работы
const db = require("./db_connector"); //Взаимодействие с БД const db = require("./db_connector"); //Взаимодействие с БД
const { makeErrorText, validateObject, getAppSrvFunction } = require("./utils"); //Вспомогательные функции const { makeErrorText, validateObject, getAppSrvFunction, buildURL } = require("./utils"); //Вспомогательные функции
const { ServerError } = require("./server_errors"); //Типовая ошибка const { ServerError } = require("./server_errors"); //Типовая ошибка
const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений
const prmsOutQueueProcessorSchema = require("../models/prms_out_queue_processor"); //Схема валидации параметров функций модуля const prmsOutQueueProcessorSchema = require("../models/prms_out_queue_processor"); //Схема валидации параметров функций модуля
const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди
const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса
const { SERR_OBJECT_BAD_INTERFACE, SERR_APP_SERVER_BEFORE, SERR_APP_SERVER_AFTER } = require("./constants"); //Глобальные константы const { SERR_OBJECT_BAD_INTERFACE, SERR_APP_SERVER_BEFORE, SERR_APP_SERVER_AFTER } = require("./constants"); //Глобальные константы
const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
@ -87,12 +88,23 @@ const appProcess = async prms => {
}, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
{ nQueueId: prms.queue.nId } { nQueueId: prms.queue.nId }
); );
//Считаем тело сообщения
let qData = await dbConn.getQueueMsg({ nQueueId: prms.queue.nId });
//Кладём данные тела в объект сообщения и инициализируем поле для ответа
_.extend(prms.queue, { blMsg: qData.blMsg, blResp: null });
//Собираем параметры для передачи серверу //Собираем параметры для передачи серверу
let options = { let options = { method: prms.service.sFnPrmsType };
url: `${prms.service.sSrvRoot}/${prms.function.sFnURL}`, //Определимся с URL и телом сообщения в зависимости от способа передачи параметров
method: prms.service.sFnPrmsType, if (prms.service.sFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) {
body: prms.queue.sMsg options.url = buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL });
}; options.body = prms.queue.blMsg;
} else {
options.url = buildURL({
sSrvRoot: prms.service.sSrvRoot,
sFnURL: prms.function.sFnURL,
sQuery: prms.queue.blMsg.toString()
});
}
//Выполняем обработчик "До" (если он есть) //Выполняем обработчик "До" (если он есть)
if (prms.function.sAppSrvBefore) { if (prms.function.sAppSrvBefore) {
const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore); const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore);